fluxus_api/io/
collection_sink.rs1use async_trait::async_trait;
2use fluxus_sinks::Sink;
3use fluxus_utils::models::{Record, StreamResult};
4use std::sync::{Arc, Mutex};
5
6#[derive(Default, Clone)]
8pub struct CollectionSink<T> {
9 data: Arc<Mutex<Vec<T>>>,
10}
11
12impl<T> CollectionSink<T> {
13 pub fn new() -> Self {
14 Self {
15 data: Arc::new(Mutex::new(Vec::new())),
16 }
17 }
18
19 pub fn get_data(&self) -> Vec<T>
20 where
21 T: Clone,
22 {
23 self.data
24 .lock()
25 .map_or_else(|p| p.into_inner().clone(), |d| d.clone())
26 }
27
28 pub fn get_last_element(&self) -> Option<T>
29 where
30 T: Clone,
31 {
32 self.data
33 .lock()
34 .map_or_else(|p| p.into_inner().last().cloned(), |d| d.last().cloned())
35 }
36}
37
38#[async_trait]
39impl<T> Sink<T> for CollectionSink<T>
40where
41 T: Clone + Send + Sync + 'static,
42{
43 async fn init(&mut self) -> StreamResult<()> {
44 Ok(())
45 }
46
47 async fn write(&mut self, record: Record<T>) -> StreamResult<()> {
48 if let Ok(mut data) = self.data.lock() {
49 data.push(record.data)
50 }
51 Ok(())
52 }
53
54 async fn flush(&mut self) -> StreamResult<()> {
55 Ok(())
56 }
57
58 async fn close(&mut self) -> StreamResult<()> {
59 Ok(())
60 }
61}