fluxus_api/io/
collection_sink.rs

1use async_trait::async_trait;
2use fluxus_sinks::Sink;
3use fluxus_utils::models::{Record, StreamResult};
4use std::sync::{Arc, Mutex};
5
6/// A sink that collects elements into a Vec
7#[derive(Default, Clone)]
8pub struct CollectionSink<T> {
9    data: Arc<Mutex<Vec<T>>>,
10}
11
12impl<T> CollectionSink<T> {
13    pub fn new() -> Self {
14        Self {
15            data: Arc::new(Mutex::new(Vec::new())),
16        }
17    }
18
19    pub fn get_data(&self) -> Vec<T>
20    where
21        T: Clone,
22    {
23        self.data
24            .lock()
25            .map_or_else(|p| p.into_inner().clone(), |d| d.clone())
26    }
27
28    pub fn get_last_element(&self) -> Option<T>
29    where
30        T: Clone,
31    {
32        self.data
33            .lock()
34            .map_or_else(|p| p.into_inner().last().cloned(), |d| d.last().cloned())
35    }
36}
37
38#[async_trait]
39impl<T> Sink<T> for CollectionSink<T>
40where
41    T: Clone + Send + Sync + 'static,
42{
43    async fn init(&mut self) -> StreamResult<()> {
44        Ok(())
45    }
46
47    async fn write(&mut self, record: Record<T>) -> StreamResult<()> {
48        if let Ok(mut data) = self.data.lock() {
49            data.push(record.data)
50        }
51        Ok(())
52    }
53
54    async fn flush(&mut self) -> StreamResult<()> {
55        Ok(())
56    }
57
58    async fn close(&mut self) -> StreamResult<()> {
59        Ok(())
60    }
61}