Skip to main content

laminar_core/sink/
adapter.rs

1//! Epoch-based adapter bridging `reactor::Sink` to `ExactlyOnceSink`
2//!
3//! `ExactlyOnceSinkAdapter<E>` manages epoch-based transactions over any
4//! `ExactlyOnceSink` implementation, allowing it to be used as a `reactor::Sink`.
5//! Each epoch maps to one transaction: writes accumulate within an epoch, and
6//! `notify_checkpoint` commits the current epoch's transaction.
7
8use super::checkpoint::SinkCheckpoint;
9use super::error::SinkError;
10use super::traits::{ExactlyOnceSink, TransactionId};
11use crate::operator::Output;
12use crate::reactor::Sink;
13
14/// Configuration for the adapter
15#[derive(Debug, Clone)]
16pub struct AdapterConfig {
17    /// Maximum outputs per epoch before auto-flush (0 = unlimited)
18    pub max_outputs_per_epoch: usize,
19    /// Whether to auto-begin a transaction on the first write (default: true)
20    pub auto_begin: bool,
21}
22
23impl Default for AdapterConfig {
24    fn default() -> Self {
25        Self {
26            max_outputs_per_epoch: 0,
27            auto_begin: true,
28        }
29    }
30}
31
32/// Statistics for the adapter
33#[derive(Debug, Clone, Copy, Default)]
34pub struct AdapterStats {
35    /// Current epoch number
36    pub epoch: u64,
37    /// Number of outputs written in the current epoch
38    pub epoch_output_count: u64,
39    /// Total number of epochs committed
40    pub committed_epochs: u64,
41    /// Total outputs written across all epochs
42    pub total_outputs: u64,
43}
44
45/// Bridges `reactor::Sink` to any `ExactlyOnceSink` with epoch-based transactions.
46///
47/// Each checkpoint epoch corresponds to exactly one transaction on the inner sink.
48/// The adapter auto-begins transactions on the first write (configurable) and
49/// commits them when `notify_checkpoint` is called.
50///
51/// # Usage
52///
53/// ```ignore
54/// let inner = TransactionalSink::new(my_sink, "sink-1");
55/// let mut adapter = ExactlyOnceSinkAdapter::new(inner);
56///
57/// // Write outputs (auto-begins transaction)
58/// adapter.write(outputs)?;
59///
60/// // On checkpoint, commit the epoch
61/// let checkpoint = adapter.notify_checkpoint(1)?;
62/// ```
63pub struct ExactlyOnceSinkAdapter<E: ExactlyOnceSink> {
64    /// The inner exactly-once sink
65    inner: E,
66    /// Current transaction ID (if active)
67    current_tx: Option<TransactionId>,
68    /// Current epoch
69    epoch: u64,
70    /// Configuration
71    config: AdapterConfig,
72    /// Statistics
73    stats: AdapterStats,
74}
75
76impl<E: ExactlyOnceSink> ExactlyOnceSinkAdapter<E> {
77    /// Create a new adapter wrapping the given exactly-once sink
78    pub fn new(inner: E) -> Self {
79        Self {
80            inner,
81            current_tx: None,
82            epoch: 0,
83            config: AdapterConfig::default(),
84            stats: AdapterStats::default(),
85        }
86    }
87
88    /// Create with custom configuration
89    pub fn with_config(inner: E, config: AdapterConfig) -> Self {
90        Self {
91            inner,
92            current_tx: None,
93            epoch: 0,
94            config,
95            stats: AdapterStats::default(),
96        }
97    }
98
99    /// Get a reference to the inner sink
100    #[must_use]
101    pub fn inner(&self) -> &E {
102        &self.inner
103    }
104
105    /// Get a mutable reference to the inner sink
106    pub fn inner_mut(&mut self) -> &mut E {
107        &mut self.inner
108    }
109
110    /// Get current statistics
111    #[must_use]
112    pub fn stats(&self) -> AdapterStats {
113        self.stats
114    }
115
116    /// Get the current epoch
117    #[must_use]
118    pub fn epoch(&self) -> u64 {
119        self.epoch
120    }
121
122    /// Check if there is an active transaction
123    #[must_use]
124    pub fn has_active_transaction(&self) -> bool {
125        self.current_tx.is_some()
126    }
127
128    /// Ensure a transaction is active, auto-beginning one if configured.
129    fn ensure_transaction(&mut self) -> Result<TransactionId, SinkError> {
130        if let Some(ref tx_id) = self.current_tx {
131            return Ok(tx_id.clone());
132        }
133
134        if !self.config.auto_begin {
135            return Err(SinkError::NoActiveTransaction);
136        }
137
138        let tx_id = self.inner.begin_transaction()?;
139        self.current_tx = Some(tx_id.clone());
140        Ok(tx_id)
141    }
142
143    /// Notify that a checkpoint has occurred at the given epoch.
144    ///
145    /// This commits the current transaction (if any), creates a checkpoint,
146    /// and advances the epoch.
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if the commit fails.
151    pub fn notify_checkpoint(&mut self, epoch: u64) -> Result<SinkCheckpoint, SinkError> {
152        // Commit current transaction if active
153        if let Some(tx_id) = self.current_tx.take() {
154            self.inner.commit(&tx_id)?;
155            self.stats.committed_epochs += 1;
156        }
157
158        self.epoch = epoch;
159        self.stats.epoch = epoch;
160        self.stats.epoch_output_count = 0;
161
162        // Get checkpoint from inner sink
163        let mut checkpoint = self.inner.checkpoint();
164        checkpoint.set_epoch(epoch);
165        checkpoint.set_metadata("adapter_epoch", epoch.to_le_bytes().to_vec());
166        checkpoint.set_metadata(
167            "committed_epochs",
168            self.stats.committed_epochs.to_le_bytes().to_vec(),
169        );
170        checkpoint.set_metadata(
171            "total_outputs",
172            self.stats.total_outputs.to_le_bytes().to_vec(),
173        );
174
175        Ok(checkpoint)
176    }
177
178    /// Restore the adapter from a checkpoint.
179    ///
180    /// Rolls back any pending transaction, restores the inner sink,
181    /// and resets the epoch.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if the inner sink cannot be restored.
186    ///
187    /// # Panics
188    ///
189    /// Will not panic — all array conversions are bounds-checked before unwrapping.
190    #[allow(clippy::missing_panics_doc)]
191    pub fn restore(&mut self, checkpoint: &SinkCheckpoint) -> Result<(), SinkError> {
192        // Rollback pending transaction if any
193        if let Some(tx_id) = self.current_tx.take() {
194            // Best-effort rollback; ignore errors since we're recovering
195            let _ = self.inner.rollback(&tx_id);
196        }
197
198        // Restore inner sink
199        self.inner.restore(checkpoint)?;
200
201        // Restore adapter state
202        self.epoch = checkpoint.epoch();
203        self.stats.epoch = checkpoint.epoch();
204        self.stats.epoch_output_count = 0;
205
206        if let Some(bytes) = checkpoint.get_metadata("committed_epochs") {
207            if bytes.len() >= 8 {
208                self.stats.committed_epochs = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
209            }
210        }
211        if let Some(bytes) = checkpoint.get_metadata("total_outputs") {
212            if bytes.len() >= 8 {
213                self.stats.total_outputs = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
214            }
215        }
216
217        Ok(())
218    }
219
220    /// Get the current checkpoint state without committing.
221    #[must_use]
222    pub fn checkpoint(&self) -> SinkCheckpoint {
223        let mut cp = self.inner.checkpoint();
224        cp.set_epoch(self.epoch);
225        cp.set_metadata("adapter_epoch", self.epoch.to_le_bytes().to_vec());
226        cp.set_metadata(
227            "committed_epochs",
228            self.stats.committed_epochs.to_le_bytes().to_vec(),
229        );
230        cp.set_metadata(
231            "total_outputs",
232            self.stats.total_outputs.to_le_bytes().to_vec(),
233        );
234        cp
235    }
236}
237
238impl<E: ExactlyOnceSink> Sink for ExactlyOnceSinkAdapter<E> {
239    fn write(&mut self, outputs: Vec<Output>) -> Result<(), crate::reactor::SinkError> {
240        let tx_id = self
241            .ensure_transaction()
242            .map_err(|e| crate::reactor::SinkError::WriteFailed(e.to_string()))?;
243
244        let count = outputs.len() as u64;
245
246        self.inner
247            .write(&tx_id, outputs)
248            .map_err(crate::reactor::SinkError::from)?;
249
250        self.stats.epoch_output_count += count;
251        self.stats.total_outputs += count;
252        Ok(())
253    }
254
255    fn flush(&mut self) -> Result<(), crate::reactor::SinkError> {
256        // Commit the current transaction on flush
257        if let Some(tx_id) = self.current_tx.take() {
258            self.inner
259                .commit(&tx_id)
260                .map_err(|e| crate::reactor::SinkError::FlushFailed(e.to_string()))?;
261            self.stats.committed_epochs += 1;
262            self.stats.epoch_output_count = 0;
263        }
264        Ok(())
265    }
266}
267
268#[cfg(test)]
269#[allow(clippy::cast_sign_loss)]
270mod tests {
271    use super::*;
272    use crate::operator::Event;
273    use crate::reactor::{BufferingSink, Sink as ReactorSink};
274    use crate::sink::idempotent::IdempotentSink;
275    use crate::sink::transactional::TransactionalSink;
276    use crate::sink::InMemoryDedup;
277    use arrow_array::{Int64Array, RecordBatch};
278    use std::sync::Arc;
279
280    fn make_event(timestamp: i64, value: i64) -> Event {
281        let array = Arc::new(Int64Array::from(vec![value]));
282        let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
283        Event::new(timestamp, batch)
284    }
285
286    #[test]
287    fn test_new_adapter() {
288        let inner_sink = BufferingSink::new();
289        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
290        let adapter = ExactlyOnceSinkAdapter::new(tx_sink);
291
292        assert_eq!(adapter.epoch(), 0);
293        assert!(!adapter.has_active_transaction());
294        assert_eq!(adapter.stats().committed_epochs, 0);
295    }
296
297    #[test]
298    fn test_auto_begin_on_write() {
299        let inner_sink = BufferingSink::new();
300        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
301        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
302
303        let event = make_event(1000, 42);
304        ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
305
306        assert!(adapter.has_active_transaction());
307        assert_eq!(adapter.stats().epoch_output_count, 1);
308        assert_eq!(adapter.stats().total_outputs, 1);
309    }
310
311    #[test]
312    fn test_checkpoint_commits_transaction() {
313        let inner_sink = BufferingSink::new();
314        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
315        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
316
317        // Write some data
318        let event = make_event(1000, 42);
319        ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
320        assert!(adapter.has_active_transaction());
321
322        // Checkpoint should commit
323        let checkpoint = adapter.notify_checkpoint(1).unwrap();
324        assert!(!adapter.has_active_transaction());
325        assert_eq!(adapter.epoch(), 1);
326        assert_eq!(checkpoint.epoch(), 1);
327        assert_eq!(adapter.stats().committed_epochs, 1);
328    }
329
330    #[test]
331    fn test_epoch_advances() {
332        let inner_sink = BufferingSink::new();
333        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
334        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
335
336        for epoch in 1..=3 {
337            let event = make_event(epoch * 1000, epoch);
338            ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
339            adapter.notify_checkpoint(epoch as u64).unwrap();
340        }
341
342        assert_eq!(adapter.epoch(), 3);
343        assert_eq!(adapter.stats().committed_epochs, 3);
344        assert_eq!(adapter.stats().total_outputs, 3);
345    }
346
347    #[test]
348    fn test_restore_rollback() {
349        let inner_sink = BufferingSink::new();
350        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
351        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
352
353        // Commit epoch 1
354        let event = make_event(1000, 1);
355        ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
356        let checkpoint = adapter.notify_checkpoint(1).unwrap();
357
358        // Start epoch 2 but don't commit (simulates crash)
359        let event2 = make_event(2000, 2);
360        ReactorSink::write(&mut adapter, vec![Output::Event(event2)]).unwrap();
361        assert!(adapter.has_active_transaction());
362
363        // Restore from epoch 1 checkpoint
364        adapter.restore(&checkpoint).unwrap();
365        assert!(!adapter.has_active_transaction());
366        assert_eq!(adapter.epoch(), 1);
367        assert_eq!(adapter.stats().committed_epochs, 1);
368    }
369
370    #[test]
371    fn test_flush_commits() {
372        let inner_sink = BufferingSink::new();
373        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
374        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
375
376        let event = make_event(1000, 42);
377        ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
378
379        ReactorSink::flush(&mut adapter).unwrap();
380        assert!(!adapter.has_active_transaction());
381        assert_eq!(adapter.stats().committed_epochs, 1);
382    }
383
384    #[test]
385    fn test_stats_tracking() {
386        let inner_sink = BufferingSink::new();
387        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
388        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
389
390        // Write 3 outputs, checkpoint
391        for i in 0..3 {
392            let event = make_event(i * 1000, i);
393            ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
394        }
395
396        let stats = adapter.stats();
397        assert_eq!(stats.epoch_output_count, 3);
398        assert_eq!(stats.total_outputs, 3);
399        assert_eq!(stats.committed_epochs, 0);
400
401        adapter.notify_checkpoint(1).unwrap();
402        let stats = adapter.stats();
403        assert_eq!(stats.epoch_output_count, 0); // reset after checkpoint
404        assert_eq!(stats.total_outputs, 3);
405        assert_eq!(stats.committed_epochs, 1);
406    }
407
408    #[test]
409    fn test_compose_with_transactional_sink() {
410        let inner_sink = BufferingSink::new();
411        let tx_sink = TransactionalSink::new(inner_sink, "tx-sink");
412        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
413
414        let event = make_event(1000, 42);
415        ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
416        let checkpoint = adapter.notify_checkpoint(1).unwrap();
417
418        assert_eq!(checkpoint.epoch(), 1);
419        assert_eq!(adapter.inner().stats().committed, 1);
420    }
421
422    #[test]
423    fn test_compose_with_idempotent_sink() {
424        let inner_sink = BufferingSink::new();
425        let dedup = InMemoryDedup::new(1000);
426        let idem_sink = IdempotentSink::new(inner_sink, dedup).with_sink_id("idem-sink");
427        let mut adapter = ExactlyOnceSinkAdapter::new(idem_sink);
428
429        let event = make_event(1000, 42);
430        ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
431        let checkpoint = adapter.notify_checkpoint(1).unwrap();
432
433        assert_eq!(checkpoint.epoch(), 1);
434        assert_eq!(adapter.stats().committed_epochs, 1);
435    }
436
437    #[test]
438    fn test_multiple_writes_per_epoch() {
439        let inner_sink = BufferingSink::new();
440        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
441        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
442
443        // Multiple writes within one epoch
444        for i in 0..5 {
445            let event = make_event(i * 100, i);
446            ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
447        }
448
449        assert_eq!(adapter.stats().epoch_output_count, 5);
450
451        adapter.notify_checkpoint(1).unwrap();
452        assert_eq!(adapter.stats().total_outputs, 5);
453        assert_eq!(adapter.stats().committed_epochs, 1);
454    }
455
456    #[test]
457    fn test_empty_epoch_checkpoint() {
458        let inner_sink = BufferingSink::new();
459        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
460        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
461
462        // Checkpoint with no writes (no transaction to commit)
463        let checkpoint = adapter.notify_checkpoint(1).unwrap();
464        assert_eq!(checkpoint.epoch(), 1);
465        assert_eq!(adapter.stats().committed_epochs, 0); // nothing was committed
466    }
467
468    #[test]
469    fn test_auto_begin_disabled() {
470        let inner_sink = BufferingSink::new();
471        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
472        let config = AdapterConfig {
473            auto_begin: false,
474            ..Default::default()
475        };
476        let mut adapter = ExactlyOnceSinkAdapter::with_config(tx_sink, config);
477
478        let event = make_event(1000, 42);
479        let result = ReactorSink::write(&mut adapter, vec![Output::Event(event)]);
480        assert!(result.is_err()); // Should fail because auto-begin is off
481    }
482
483    #[test]
484    fn test_checkpoint_state_without_commit() {
485        let inner_sink = BufferingSink::new();
486        let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
487        let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
488
489        // Write without committing
490        let event = make_event(1000, 42);
491        ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
492
493        // checkpoint() should return state without committing
494        let cp = adapter.checkpoint();
495        assert!(adapter.has_active_transaction()); // still active
496        assert_eq!(cp.epoch(), 0); // epoch hasn't advanced
497    }
498}