fluxus_api/io/
collection_sink.rs1use async_trait::async_trait;
2use fluxus_core::{Record, Sink, StreamResult};
3use std::sync::{Arc, Mutex};
4
5#[derive(Default, Clone)]
7pub struct CollectionSink<T> {
8 data: Arc<Mutex<Vec<T>>>,
9}
10
11impl<T> CollectionSink<T> {
12 pub fn new() -> Self {
13 Self {
14 data: Arc::new(Mutex::new(Vec::new())),
15 }
16 }
17
18 pub fn get_data(&self) -> Vec<T>
19 where
20 T: Clone,
21 {
22 self.data
23 .lock()
24 .map_or_else(|p| p.into_inner().clone(), |d| d.clone())
25 }
26}
27
28#[async_trait]
29impl<T> Sink<T> for CollectionSink<T>
30where
31 T: Clone + Send + Sync + 'static,
32{
33 async fn init(&mut self) -> StreamResult<()> {
34 Ok(())
35 }
36
37 async fn write(&mut self, record: Record<T>) -> StreamResult<()> {
38 if let Ok(mut data) = self.data.lock() {
39 data.push(record.data)
40 }
41 Ok(())
42 }
43
44 async fn flush(&mut self) -> StreamResult<()> {
45 Ok(())
46 }
47
48 async fn close(&mut self) -> StreamResult<()> {
49 Ok(())
50 }
51}