streamweave_tempfile/
tempfile_consumer.rs1use async_trait::async_trait;
2use futures::{Stream, StreamExt};
3use std::path::PathBuf;
4use std::pin::Pin;
5use streamweave::{Consumer, ConsumerConfig, Input};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7use tokio::fs::File;
8use tokio::io::AsyncWriteExt;
9use tracing::warn;
10
11pub struct TempFileConsumer {
15 pub path: PathBuf,
17 pub config: ConsumerConfig<String>,
19}
20
21impl TempFileConsumer {
22 pub fn new(path: PathBuf) -> Self {
28 Self {
29 path,
30 config: ConsumerConfig::default(),
31 }
32 }
33
34 pub fn with_error_strategy(mut self, strategy: ErrorStrategy<String>) -> Self {
36 self.config.error_strategy = strategy;
37 self
38 }
39
40 pub fn with_name(mut self, name: String) -> Self {
42 self.config.name = name;
43 self
44 }
45}
46
47impl Input for TempFileConsumer {
50 type Input = String;
51 type InputStream = Pin<Box<dyn Stream<Item = String> + Send>>;
52}
53
54#[async_trait]
55impl Consumer for TempFileConsumer {
56 type InputPorts = (String,);
57
58 async fn consume(&mut self, mut stream: Self::InputStream) -> () {
59 let component_name = self.config.name.clone();
60 let path = self.path.clone();
61
62 match File::create(&path).await {
63 Ok(mut file) => {
64 while let Some(value) = stream.next().await {
65 if let Err(e) = file.write_all(value.as_bytes()).await {
66 warn!(
67 component = %component_name,
68 path = %path.display(),
69 error = %e,
70 "Failed to write to temp file"
71 );
72 }
73 if let Err(e) = file.write_all(b"\n").await {
74 warn!(
75 component = %component_name,
76 path = %path.display(),
77 error = %e,
78 "Failed to write newline to temp file"
79 );
80 }
81 }
82 if let Err(e) = file.flush().await {
83 warn!(
84 component = %component_name,
85 path = %path.display(),
86 error = %e,
87 "Failed to flush temp file"
88 );
89 }
90 }
91 Err(e) => {
92 warn!(
93 component = %component_name,
94 path = %path.display(),
95 error = %e,
96 "Failed to create temp file, all items will be dropped"
97 );
98 }
99 }
100 }
101
102 fn set_config_impl(&mut self, config: ConsumerConfig<String>) {
103 self.config = config;
104 }
105
106 fn get_config_impl(&self) -> &ConsumerConfig<String> {
107 &self.config
108 }
109
110 fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<String> {
111 &mut self.config
112 }
113
114 fn handle_error(&self, error: &StreamError<String>) -> ErrorAction {
115 match self.config.error_strategy {
116 ErrorStrategy::Stop => ErrorAction::Stop,
117 ErrorStrategy::Skip => ErrorAction::Skip,
118 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
119 ErrorStrategy::Custom(ref handler) => handler(error),
120 _ => ErrorAction::Stop,
121 }
122 }
123
124 fn create_error_context(&self, item: Option<String>) -> ErrorContext<String> {
125 ErrorContext {
126 timestamp: chrono::Utc::now(),
127 item,
128 component_name: self.config.name.clone(),
129 component_type: std::any::type_name::<Self>().to_string(),
130 }
131 }
132
133 fn component_info(&self) -> ComponentInfo {
134 ComponentInfo {
135 name: self.config.name.clone(),
136 type_name: std::any::type_name::<Self>().to_string(),
137 }
138 }
139}