datasynth_core/traits/
sink.rs1use crate::error::SynthError;
7
8pub trait Sink {
13 type Item;
15
16 fn write(&mut self, item: Self::Item) -> Result<(), SynthError>;
18
19 fn write_batch(&mut self, items: Vec<Self::Item>) -> Result<(), SynthError> {
23 for item in items {
24 self.write(item)?;
25 }
26 Ok(())
27 }
28
29 fn flush(&mut self) -> Result<(), SynthError>;
31
32 fn close(self) -> Result<(), SynthError>
36 where
37 Self: Sized;
38
39 fn items_written(&self) -> u64;
41
42 fn bytes_written(&self) -> Option<u64> {
44 None
45 }
46}
47
48pub struct NullSink {
50 count: u64,
51}
52
53impl NullSink {
54 pub fn new() -> Self {
55 Self { count: 0 }
56 }
57}
58
59impl Default for NullSink {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl NullSink {
69 pub fn write_any<T>(&mut self, _item: T) {
71 self.count += 1;
72 }
73
74 pub fn items_written(&self) -> u64 {
76 self.count
77 }
78}
79
80pub struct VecSink<T> {
82 items: Vec<T>,
83}
84
85impl<T> VecSink<T> {
86 pub fn new() -> Self {
87 Self { items: Vec::new() }
88 }
89
90 pub fn with_capacity(capacity: usize) -> Self {
91 Self {
92 items: Vec::with_capacity(capacity),
93 }
94 }
95
96 pub fn into_items(self) -> Vec<T> {
98 self.items
99 }
100
101 pub fn items(&self) -> &[T] {
103 &self.items
104 }
105}
106
107impl<T> Default for VecSink<T> {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113impl<T> Sink for VecSink<T> {
114 type Item = T;
115
116 fn write(&mut self, item: Self::Item) -> Result<(), SynthError> {
117 self.items.push(item);
118 Ok(())
119 }
120
121 fn write_batch(&mut self, items: Vec<Self::Item>) -> Result<(), SynthError> {
122 self.items.extend(items);
123 Ok(())
124 }
125
126 fn flush(&mut self) -> Result<(), SynthError> {
127 Ok(())
128 }
129
130 fn close(self) -> Result<(), SynthError> {
131 Ok(())
132 }
133
134 fn items_written(&self) -> u64 {
135 self.items.len() as u64
136 }
137}
138
139pub trait PartitionedSink: Sink {
141 type PartitionKey;
143
144 fn write_to_partition(
146 &mut self,
147 partition: Self::PartitionKey,
148 item: Self::Item,
149 ) -> Result<(), SynthError>;
150
151 fn flush_partition(&mut self, partition: Self::PartitionKey) -> Result<(), SynthError>;
153}
154
155#[derive(Debug, Clone)]
157pub struct SinkBufferConfig {
158 pub max_items: usize,
160 pub max_bytes: Option<usize>,
162 pub flush_on_write: bool,
164}
165
166impl Default for SinkBufferConfig {
167 fn default() -> Self {
168 Self {
169 max_items: 10_000,
170 max_bytes: Some(64 * 1024 * 1024), flush_on_write: false,
172 }
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179
180 #[test]
181 fn test_null_sink() {
182 let mut sink = NullSink::new();
183 sink.write_any(42);
184 sink.write_any(43);
185 assert_eq!(sink.items_written(), 2);
186 }
187
188 #[test]
189 fn test_vec_sink() {
190 let mut sink = VecSink::new();
191 sink.write(1).unwrap();
192 sink.write(2).unwrap();
193 sink.write(3).unwrap();
194
195 assert_eq!(sink.items_written(), 3);
196 assert_eq!(sink.into_items(), vec![1, 2, 3]);
197 }
198
199 #[test]
200 fn test_vec_sink_batch() {
201 let mut sink = VecSink::new();
202 sink.write_batch(vec![1, 2, 3]).unwrap();
203 sink.write_batch(vec![4, 5]).unwrap();
204
205 assert_eq!(sink.items_written(), 5);
206 assert_eq!(sink.into_items(), vec![1, 2, 3, 4, 5]);
207 }
208}