streamweave_tempfile/consumers/
consumer.rs1use super::tempfile_consumer::TempFileConsumer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use streamweave::{Consumer, ConsumerConfig};
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
6use tokio::fs::File;
7use tokio::io::AsyncWriteExt;
8use tracing::warn;
9
10#[async_trait]
11impl Consumer for TempFileConsumer {
12 type InputPorts = (String,);
13
14 async fn consume(&mut self, mut stream: Self::InputStream) -> () {
15 let component_name = self.config.name.clone();
16 let path = self.path.clone();
17
18 match File::create(&path).await {
19 Ok(mut file) => {
20 while let Some(value) = stream.next().await {
21 if let Err(e) = file.write_all(value.as_bytes()).await {
22 warn!(
23 component = %component_name,
24 path = %path.display(),
25 error = %e,
26 "Failed to write to temp file"
27 );
28 }
29 if let Err(e) = file.write_all(b"\n").await {
30 warn!(
31 component = %component_name,
32 path = %path.display(),
33 error = %e,
34 "Failed to write newline to temp file"
35 );
36 }
37 }
38 if let Err(e) = file.flush().await {
39 warn!(
40 component = %component_name,
41 path = %path.display(),
42 error = %e,
43 "Failed to flush temp file"
44 );
45 }
46 }
47 Err(e) => {
48 warn!(
49 component = %component_name,
50 path = %path.display(),
51 error = %e,
52 "Failed to create temp file, all items will be dropped"
53 );
54 }
55 }
56 }
57
58 fn set_config_impl(&mut self, config: ConsumerConfig<String>) {
59 self.config = config;
60 }
61
62 fn get_config_impl(&self) -> &ConsumerConfig<String> {
63 &self.config
64 }
65
66 fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<String> {
67 &mut self.config
68 }
69
70 fn handle_error(&self, error: &StreamError<String>) -> ErrorAction {
71 match self.config.error_strategy {
72 ErrorStrategy::Stop => ErrorAction::Stop,
73 ErrorStrategy::Skip => ErrorAction::Skip,
74 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
75 ErrorStrategy::Custom(ref handler) => handler(error),
76 _ => ErrorAction::Stop,
77 }
78 }
79
80 fn create_error_context(&self, item: Option<String>) -> ErrorContext<String> {
81 ErrorContext {
82 timestamp: chrono::Utc::now(),
83 item,
84 component_name: self.config.name.clone(),
85 component_type: std::any::type_name::<Self>().to_string(),
86 }
87 }
88
89 fn component_info(&self) -> ComponentInfo {
90 ComponentInfo {
91 name: self.config.name.clone(),
92 type_name: std::any::type_name::<Self>().to_string(),
93 }
94 }
95}