Skip to main content

datasynth_core/traits/
sink.rs

1//! Output Sink trait for writing generated data.
2//!
3//! Defines the interface for output destinations including files,
4//! streams, and databases.
5
6use crate::error::SynthError;
7
8/// Core trait for output sinks.
9///
10/// Sinks receive generated data and write it to a destination.
11/// They handle batching, buffering, and format conversion.
12pub trait Sink {
13    /// The type of items this sink accepts.
14    type Item;
15
16    /// Write a single item to the sink.
17    fn write(&mut self, item: Self::Item) -> Result<(), SynthError>;
18
19    /// Write a batch of items to the sink.
20    ///
21    /// Default implementation calls write repeatedly.
22    fn write_batch(&mut self, items: Vec<Self::Item>) -> Result<(), SynthError> {
23        for item in items {
24            self.write(item)?;
25        }
26        Ok(())
27    }
28
29    /// Flush any buffered data to the destination.
30    fn flush(&mut self) -> Result<(), SynthError>;
31
32    /// Close the sink and release resources.
33    ///
34    /// After calling close, the sink should not be used.
35    fn close(self) -> Result<(), SynthError>
36    where
37        Self: Sized;
38
39    /// Get the number of items written.
40    fn items_written(&self) -> u64;
41
42    /// Get the number of bytes written (if applicable).
43    fn bytes_written(&self) -> Option<u64> {
44        None
45    }
46}
47
48/// A sink that discards all data (useful for benchmarking).
49pub struct NullSink {
50    count: u64,
51}
52
53impl NullSink {
54    pub fn new() -> Self {
55        Self { count: 0 }
56    }
57}
58
59impl Default for NullSink {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65/// Implement Sink for any type that can be counted.
66/// Usage: let mut sink: NullSink = NullSink::new();
67///        sink.write_any(item);
68impl NullSink {
69    /// Write any item (type-erased counting).
70    pub fn write_any<T>(&mut self, _item: T) {
71        self.count += 1;
72    }
73
74    /// Get the number of items written.
75    pub fn items_written(&self) -> u64 {
76        self.count
77    }
78}
79
80/// A sink that collects items into a vector.
81pub struct VecSink<T> {
82    items: Vec<T>,
83}
84
85impl<T> VecSink<T> {
86    pub fn new() -> Self {
87        Self { items: Vec::new() }
88    }
89
90    pub fn with_capacity(capacity: usize) -> Self {
91        Self {
92            items: Vec::with_capacity(capacity),
93        }
94    }
95
96    /// Consume the sink and return collected items.
97    pub fn into_items(self) -> Vec<T> {
98        self.items
99    }
100
101    /// Get a reference to collected items.
102    pub fn items(&self) -> &[T] {
103        &self.items
104    }
105}
106
107impl<T> Default for VecSink<T> {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl<T> Sink for VecSink<T> {
114    type Item = T;
115
116    fn write(&mut self, item: Self::Item) -> Result<(), SynthError> {
117        self.items.push(item);
118        Ok(())
119    }
120
121    fn write_batch(&mut self, items: Vec<Self::Item>) -> Result<(), SynthError> {
122        self.items.extend(items);
123        Ok(())
124    }
125
126    fn flush(&mut self) -> Result<(), SynthError> {
127        Ok(())
128    }
129
130    fn close(self) -> Result<(), SynthError> {
131        Ok(())
132    }
133
134    fn items_written(&self) -> u64 {
135        self.items.len() as u64
136    }
137}
138
139/// Trait for sinks that support partitioned output.
140pub trait PartitionedSink: Sink {
141    /// The partition key type.
142    type PartitionKey;
143
144    /// Write an item to a specific partition.
145    fn write_to_partition(
146        &mut self,
147        partition: Self::PartitionKey,
148        item: Self::Item,
149    ) -> Result<(), SynthError>;
150
151    /// Flush a specific partition.
152    fn flush_partition(&mut self, partition: Self::PartitionKey) -> Result<(), SynthError>;
153}
154
155/// Configuration for buffered sinks.
156#[derive(Debug, Clone)]
157pub struct SinkBufferConfig {
158    /// Maximum number of items to buffer before flushing.
159    pub max_items: usize,
160    /// Maximum bytes to buffer before flushing (if applicable).
161    pub max_bytes: Option<usize>,
162    /// Flush on every write (for debugging).
163    pub flush_on_write: bool,
164}
165
166impl Default for SinkBufferConfig {
167    fn default() -> Self {
168        Self {
169            max_items: 10_000,
170            max_bytes: Some(64 * 1024 * 1024), // 64MB
171            flush_on_write: false,
172        }
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179
180    #[test]
181    fn test_null_sink() {
182        let mut sink = NullSink::new();
183        sink.write_any(42);
184        sink.write_any(43);
185        assert_eq!(sink.items_written(), 2);
186    }
187
188    #[test]
189    fn test_vec_sink() {
190        let mut sink = VecSink::new();
191        sink.write(1).unwrap();
192        sink.write(2).unwrap();
193        sink.write(3).unwrap();
194
195        assert_eq!(sink.items_written(), 3);
196        assert_eq!(sink.into_items(), vec![1, 2, 3]);
197    }
198
199    #[test]
200    fn test_vec_sink_batch() {
201        let mut sink = VecSink::new();
202        sink.write_batch(vec![1, 2, 3]).unwrap();
203        sink.write_batch(vec![4, 5]).unwrap();
204
205        assert_eq!(sink.items_written(), 5);
206        assert_eq!(sink.into_items(), vec![1, 2, 3, 4, 5]);
207    }
208}