streamweave_tempfile/
tempfile_consumer.rs

1use async_trait::async_trait;
2use futures::{Stream, StreamExt};
3use std::path::PathBuf;
4use std::pin::Pin;
5use streamweave::{Consumer, ConsumerConfig, Input};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7use tokio::fs::File;
8use tokio::io::AsyncWriteExt;
9use tracing::warn;
10
11/// A consumer that writes items to a temporary file.
12///
13/// The temporary file is managed externally and will be cleaned up when dropped.
14pub struct TempFileConsumer {
15  /// The temporary file path
16  pub path: PathBuf,
17  /// Configuration for the consumer
18  pub config: ConsumerConfig<String>,
19}
20
21impl TempFileConsumer {
22  /// Creates a new `TempFileConsumer` from a temporary file path.
23  ///
24  /// # Arguments
25  ///
26  /// * `path` - Path to the temporary file
27  pub fn new(path: PathBuf) -> Self {
28    Self {
29      path,
30      config: ConsumerConfig::default(),
31    }
32  }
33
34  /// Sets the error handling strategy.
35  pub fn with_error_strategy(mut self, strategy: ErrorStrategy<String>) -> Self {
36    self.config.error_strategy = strategy;
37    self
38  }
39
40  /// Sets the name for this consumer.
41  pub fn with_name(mut self, name: String) -> Self {
42    self.config.name = name;
43    self
44  }
45}
46
47// Trait implementations for TempFileConsumer
48
49impl Input for TempFileConsumer {
50  type Input = String;
51  type InputStream = Pin<Box<dyn Stream<Item = String> + Send>>;
52}
53
54#[async_trait]
55impl Consumer for TempFileConsumer {
56  type InputPorts = (String,);
57
58  async fn consume(&mut self, mut stream: Self::InputStream) -> () {
59    let component_name = self.config.name.clone();
60    let path = self.path.clone();
61
62    match File::create(&path).await {
63      Ok(mut file) => {
64        while let Some(value) = stream.next().await {
65          if let Err(e) = file.write_all(value.as_bytes()).await {
66            warn!(
67              component = %component_name,
68              path = %path.display(),
69              error = %e,
70              "Failed to write to temp file"
71            );
72          }
73          if let Err(e) = file.write_all(b"\n").await {
74            warn!(
75              component = %component_name,
76              path = %path.display(),
77              error = %e,
78              "Failed to write newline to temp file"
79            );
80          }
81        }
82        if let Err(e) = file.flush().await {
83          warn!(
84            component = %component_name,
85            path = %path.display(),
86            error = %e,
87            "Failed to flush temp file"
88          );
89        }
90      }
91      Err(e) => {
92        warn!(
93          component = %component_name,
94          path = %path.display(),
95          error = %e,
96          "Failed to create temp file, all items will be dropped"
97        );
98      }
99    }
100  }
101
102  fn set_config_impl(&mut self, config: ConsumerConfig<String>) {
103    self.config = config;
104  }
105
106  fn get_config_impl(&self) -> &ConsumerConfig<String> {
107    &self.config
108  }
109
110  fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<String> {
111    &mut self.config
112  }
113
114  fn handle_error(&self, error: &StreamError<String>) -> ErrorAction {
115    match self.config.error_strategy {
116      ErrorStrategy::Stop => ErrorAction::Stop,
117      ErrorStrategy::Skip => ErrorAction::Skip,
118      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
119      ErrorStrategy::Custom(ref handler) => handler(error),
120      _ => ErrorAction::Stop,
121    }
122  }
123
124  fn create_error_context(&self, item: Option<String>) -> ErrorContext<String> {
125    ErrorContext {
126      timestamp: chrono::Utc::now(),
127      item,
128      component_name: self.config.name.clone(),
129      component_type: std::any::type_name::<Self>().to_string(),
130    }
131  }
132
133  fn component_info(&self) -> ComponentInfo {
134    ComponentInfo {
135      name: self.config.name.clone(),
136      type_name: std::any::type_name::<Self>().to_string(),
137    }
138  }
139}