streamweave_tempfile/consumers/
consumer.rs

1use super::tempfile_consumer::TempFileConsumer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use streamweave::{Consumer, ConsumerConfig};
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
6use tokio::fs::File;
7use tokio::io::AsyncWriteExt;
8use tracing::warn;
9
10#[async_trait]
11impl Consumer for TempFileConsumer {
12  type InputPorts = (String,);
13
14  async fn consume(&mut self, mut stream: Self::InputStream) -> () {
15    let component_name = self.config.name.clone();
16    let path = self.path.clone();
17
18    match File::create(&path).await {
19      Ok(mut file) => {
20        while let Some(value) = stream.next().await {
21          if let Err(e) = file.write_all(value.as_bytes()).await {
22            warn!(
23              component = %component_name,
24              path = %path.display(),
25              error = %e,
26              "Failed to write to temp file"
27            );
28          }
29          if let Err(e) = file.write_all(b"\n").await {
30            warn!(
31              component = %component_name,
32              path = %path.display(),
33              error = %e,
34              "Failed to write newline to temp file"
35            );
36          }
37        }
38        if let Err(e) = file.flush().await {
39          warn!(
40            component = %component_name,
41            path = %path.display(),
42            error = %e,
43            "Failed to flush temp file"
44          );
45        }
46      }
47      Err(e) => {
48        warn!(
49          component = %component_name,
50          path = %path.display(),
51          error = %e,
52          "Failed to create temp file, all items will be dropped"
53        );
54      }
55    }
56  }
57
58  fn set_config_impl(&mut self, config: ConsumerConfig<String>) {
59    self.config = config;
60  }
61
62  fn get_config_impl(&self) -> &ConsumerConfig<String> {
63    &self.config
64  }
65
66  fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<String> {
67    &mut self.config
68  }
69
70  fn handle_error(&self, error: &StreamError<String>) -> ErrorAction {
71    match self.config.error_strategy {
72      ErrorStrategy::Stop => ErrorAction::Stop,
73      ErrorStrategy::Skip => ErrorAction::Skip,
74      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
75      ErrorStrategy::Custom(ref handler) => handler(error),
76      _ => ErrorAction::Stop,
77    }
78  }
79
80  fn create_error_context(&self, item: Option<String>) -> ErrorContext<String> {
81    ErrorContext {
82      timestamp: chrono::Utc::now(),
83      item,
84      component_name: self.config.name.clone(),
85      component_type: std::any::type_name::<Self>().to_string(),
86    }
87  }
88
89  fn component_info(&self) -> ComponentInfo {
90    ComponentInfo {
91      name: self.config.name.clone(),
92      type_name: std::any::type_name::<Self>().to_string(),
93    }
94  }
95}