pipebase/collect/
text.rs

1use serde::Deserialize;
2use tokio::time::Interval;
3
4use super::Collect;
5use crate::common::{ConfigInto, FromConfig, FromPath, Period, Render};
6use async_trait::async_trait;
7
8#[derive(Deserialize)]
9pub struct TextCollectorConfig {
10    flush_period: Period,
11    separator: String,
12}
13
14impl FromPath for TextCollectorConfig {}
15
16impl ConfigInto<TextCollector> for TextCollectorConfig {}
17pub struct TextCollector {
18    buffer: String,
19    flush_period: Period,
20    separator: String,
21}
22
23#[async_trait]
24impl FromConfig<TextCollectorConfig> for TextCollector {
25    async fn from_config(config: TextCollectorConfig) -> anyhow::Result<Self> {
26        Ok(TextCollector {
27            buffer: String::new(),
28            flush_period: config.flush_period,
29            separator: config.separator,
30        })
31    }
32}
33
34/// # Parameters
35/// * T: input
36/// * String: output
37#[async_trait]
38impl<T> Collect<T, String, TextCollectorConfig> for TextCollector
39where
40    T: Render + Send + 'static,
41{
42    async fn collect(&mut self, t: T) -> anyhow::Result<()> {
43        let text = t.render();
44        self.buffer.push_str(&text);
45        self.buffer.push_str(&self.separator);
46        Ok(())
47    }
48
49    async fn flush(&mut self) -> anyhow::Result<Option<String>> {
50        let buffer = self.buffer.clone();
51        self.buffer.clear();
52        if buffer.is_empty() {
53            return Ok(None);
54        }
55        Ok(Some(buffer))
56    }
57
58    fn get_flush_interval(&self) -> Interval {
59        let flush_period = self.flush_period.clone();
60        tokio::time::interval(flush_period.into())
61    }
62}