fluxus_api/io/
collection_sink.rs

1use async_trait::async_trait;
2use fluxus_core::{Record, Sink, StreamResult};
3use std::sync::{Arc, Mutex};
4
5/// A sink that collects elements into a Vec
6#[derive(Default, Clone)]
7pub struct CollectionSink<T> {
8    data: Arc<Mutex<Vec<T>>>,
9}
10
11impl<T> CollectionSink<T> {
12    pub fn new() -> Self {
13        Self {
14            data: Arc::new(Mutex::new(Vec::new())),
15        }
16    }
17
18    pub fn get_data(&self) -> Vec<T>
19    where
20        T: Clone,
21    {
22        self.data
23            .lock()
24            .map_or_else(|p| p.into_inner().clone(), |d| d.clone())
25    }
26}
27
28#[async_trait]
29impl<T> Sink<T> for CollectionSink<T>
30where
31    T: Clone + Send + Sync + 'static,
32{
33    async fn init(&mut self) -> StreamResult<()> {
34        Ok(())
35    }
36
37    async fn write(&mut self, record: Record<T>) -> StreamResult<()> {
38        if let Ok(mut data) = self.data.lock() {
39            data.push(record.data)
40        }
41        Ok(())
42    }
43
44    async fn flush(&mut self) -> StreamResult<()> {
45        Ok(())
46    }
47
48    async fn close(&mut self) -> StreamResult<()> {
49        Ok(())
50    }
51}