fluxus_api/io/
collection_sink.rs1use async_trait::async_trait;
2use fluxus_sinks::Sink;
3use fluxus_utils::models::{Record, StreamResult};
4use std::sync::{Arc, Mutex};
5
6#[derive(Default, Clone)]
8pub struct CollectionSink<T> {
9 data: Arc<Mutex<Vec<T>>>,
10}
11
12impl<T> CollectionSink<T> {
13 pub fn new() -> Self {
14 Self {
15 data: Arc::new(Mutex::new(Vec::new())),
16 }
17 }
18
19 pub fn get_data(&self) -> Vec<T>
20 where
21 T: Clone,
22 {
23 self.data
24 .lock()
25 .map_or_else(|p| p.into_inner().clone(), |d| d.clone())
26 }
27}
28
29#[async_trait]
30impl<T> Sink<T> for CollectionSink<T>
31where
32 T: Clone + Send + Sync + 'static,
33{
34 async fn init(&mut self) -> StreamResult<()> {
35 Ok(())
36 }
37
38 async fn write(&mut self, record: Record<T>) -> StreamResult<()> {
39 if let Ok(mut data) = self.data.lock() {
40 data.push(record.data)
41 }
42 Ok(())
43 }
44
45 async fn flush(&mut self) -> StreamResult<()> {
46 Ok(())
47 }
48
49 async fn close(&mut self) -> StreamResult<()> {
50 Ok(())
51 }
52}