use async_stream::stream;
use futures_util::stream::StreamExt;
use rs2_stream::rs2::*;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use tokio::runtime::Runtime;
async fn acquire_resource() -> PathBuf {
println!("Resource acquired: data.txt");
PathBuf::from("data.txt")
}
async fn release_resource(path: PathBuf, exit_case: ExitCase<String>) {
println!(
"Resource released: {} with exit case: {:?}",
path.display(),
exit_case
);
}
fn use_resource(path: PathBuf) -> RS2Stream<Result<String, String>> {
let file = match File::open(&path) {
Ok(file) => file,
Err(e) => {
let error = format!("Error opening file: {}", e);
println!("{}", error);
return from_iter(vec![Err(error)]);
}
};
let reader = BufReader::new(file);
let lines = reader.lines();
stream! {
for (i, line_result) in lines.enumerate() {
match line_result {
Ok(line) => {
if line.contains("ERROR") {
yield Err(format!("Error in line {}: {}", i + 1, line));
} else {
yield Ok(line);
}
},
Err(e) => yield Err(format!("IO error: {}", e)),
}
}
}
.boxed()
}
fn main() {
std::fs::write("data.txt", "Line 1\nLine 2\nERROR in Line 3\nLine 4\n")
.expect("Failed to create data.txt");
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("Demonstrating bracket_case extension method for resource management");
let stream = from_iter(vec![Ok::<String, String>("Initial value".to_string())]);
let result = stream
.bracket_case_rs2(
acquire_resource(),
|resource| use_resource(resource),
release_resource,
)
.collect::<Vec<_>>()
.await;
println!("\nResults:");
for (i, res) in result.iter().enumerate() {
match res {
Ok(value) => println!(" {}. Success: {}", i + 1, value),
Err(error) => println!(" {}. Error: {}", i + 1, error),
}
}
println!("\nNote: The resource is properly released regardless of errors in the stream");
});
std::fs::remove_file("data.txt").expect("Failed to remove data.txt");
}