fluxus_api/io/
collection_sink.rs

1use async_trait::async_trait;
2use fluxus_sinks::Sink;
3use fluxus_utils::models::{Record, StreamResult};
4use std::sync::{Arc, Mutex};
5
6/// A sink that collects elements into a Vec
7#[derive(Default, Clone)]
8pub struct CollectionSink<T> {
9    data: Arc<Mutex<Vec<T>>>,
10}
11
12impl<T> CollectionSink<T> {
13    pub fn new() -> Self {
14        Self {
15            data: Arc::new(Mutex::new(Vec::new())),
16        }
17    }
18
19    pub fn get_data(&self) -> Vec<T>
20    where
21        T: Clone,
22    {
23        self.data
24            .lock()
25            .map_or_else(|p| p.into_inner().clone(), |d| d.clone())
26    }
27}
28
29#[async_trait]
30impl<T> Sink<T> for CollectionSink<T>
31where
32    T: Clone + Send + Sync + 'static,
33{
34    async fn init(&mut self) -> StreamResult<()> {
35        Ok(())
36    }
37
38    async fn write(&mut self, record: Record<T>) -> StreamResult<()> {
39        if let Ok(mut data) = self.data.lock() {
40            data.push(record.data)
41        }
42        Ok(())
43    }
44
45    async fn flush(&mut self) -> StreamResult<()> {
46        Ok(())
47    }
48
49    async fn close(&mut self) -> StreamResult<()> {
50        Ok(())
51    }
52}