1use super::checkpoint::SinkCheckpoint;
9use super::error::SinkError;
10use super::traits::{ExactlyOnceSink, TransactionId};
11use crate::operator::Output;
12use crate::reactor::Sink;
13
14#[derive(Debug, Clone)]
16pub struct AdapterConfig {
17 pub max_outputs_per_epoch: usize,
19 pub auto_begin: bool,
21}
22
23impl Default for AdapterConfig {
24 fn default() -> Self {
25 Self {
26 max_outputs_per_epoch: 0,
27 auto_begin: true,
28 }
29 }
30}
31
32#[derive(Debug, Clone, Copy, Default)]
34pub struct AdapterStats {
35 pub epoch: u64,
37 pub epoch_output_count: u64,
39 pub committed_epochs: u64,
41 pub total_outputs: u64,
43}
44
45pub struct ExactlyOnceSinkAdapter<E: ExactlyOnceSink> {
64 inner: E,
66 current_tx: Option<TransactionId>,
68 epoch: u64,
70 config: AdapterConfig,
72 stats: AdapterStats,
74}
75
76impl<E: ExactlyOnceSink> ExactlyOnceSinkAdapter<E> {
77 pub fn new(inner: E) -> Self {
79 Self {
80 inner,
81 current_tx: None,
82 epoch: 0,
83 config: AdapterConfig::default(),
84 stats: AdapterStats::default(),
85 }
86 }
87
88 pub fn with_config(inner: E, config: AdapterConfig) -> Self {
90 Self {
91 inner,
92 current_tx: None,
93 epoch: 0,
94 config,
95 stats: AdapterStats::default(),
96 }
97 }
98
99 #[must_use]
101 pub fn inner(&self) -> &E {
102 &self.inner
103 }
104
105 pub fn inner_mut(&mut self) -> &mut E {
107 &mut self.inner
108 }
109
110 #[must_use]
112 pub fn stats(&self) -> AdapterStats {
113 self.stats
114 }
115
116 #[must_use]
118 pub fn epoch(&self) -> u64 {
119 self.epoch
120 }
121
122 #[must_use]
124 pub fn has_active_transaction(&self) -> bool {
125 self.current_tx.is_some()
126 }
127
128 fn ensure_transaction(&mut self) -> Result<TransactionId, SinkError> {
130 if let Some(ref tx_id) = self.current_tx {
131 return Ok(tx_id.clone());
132 }
133
134 if !self.config.auto_begin {
135 return Err(SinkError::NoActiveTransaction);
136 }
137
138 let tx_id = self.inner.begin_transaction()?;
139 self.current_tx = Some(tx_id.clone());
140 Ok(tx_id)
141 }
142
143 pub fn notify_checkpoint(&mut self, epoch: u64) -> Result<SinkCheckpoint, SinkError> {
152 if let Some(tx_id) = self.current_tx.take() {
154 self.inner.commit(&tx_id)?;
155 self.stats.committed_epochs += 1;
156 }
157
158 self.epoch = epoch;
159 self.stats.epoch = epoch;
160 self.stats.epoch_output_count = 0;
161
162 let mut checkpoint = self.inner.checkpoint();
164 checkpoint.set_epoch(epoch);
165 checkpoint.set_metadata("adapter_epoch", epoch.to_le_bytes().to_vec());
166 checkpoint.set_metadata(
167 "committed_epochs",
168 self.stats.committed_epochs.to_le_bytes().to_vec(),
169 );
170 checkpoint.set_metadata(
171 "total_outputs",
172 self.stats.total_outputs.to_le_bytes().to_vec(),
173 );
174
175 Ok(checkpoint)
176 }
177
178 #[allow(clippy::missing_panics_doc)]
191 pub fn restore(&mut self, checkpoint: &SinkCheckpoint) -> Result<(), SinkError> {
192 if let Some(tx_id) = self.current_tx.take() {
194 let _ = self.inner.rollback(&tx_id);
196 }
197
198 self.inner.restore(checkpoint)?;
200
201 self.epoch = checkpoint.epoch();
203 self.stats.epoch = checkpoint.epoch();
204 self.stats.epoch_output_count = 0;
205
206 if let Some(bytes) = checkpoint.get_metadata("committed_epochs") {
207 if bytes.len() >= 8 {
208 self.stats.committed_epochs = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
209 }
210 }
211 if let Some(bytes) = checkpoint.get_metadata("total_outputs") {
212 if bytes.len() >= 8 {
213 self.stats.total_outputs = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
214 }
215 }
216
217 Ok(())
218 }
219
220 #[must_use]
222 pub fn checkpoint(&self) -> SinkCheckpoint {
223 let mut cp = self.inner.checkpoint();
224 cp.set_epoch(self.epoch);
225 cp.set_metadata("adapter_epoch", self.epoch.to_le_bytes().to_vec());
226 cp.set_metadata(
227 "committed_epochs",
228 self.stats.committed_epochs.to_le_bytes().to_vec(),
229 );
230 cp.set_metadata(
231 "total_outputs",
232 self.stats.total_outputs.to_le_bytes().to_vec(),
233 );
234 cp
235 }
236}
237
238impl<E: ExactlyOnceSink> Sink for ExactlyOnceSinkAdapter<E> {
239 fn write(&mut self, outputs: Vec<Output>) -> Result<(), crate::reactor::SinkError> {
240 let tx_id = self
241 .ensure_transaction()
242 .map_err(|e| crate::reactor::SinkError::WriteFailed(e.to_string()))?;
243
244 let count = outputs.len() as u64;
245
246 self.inner
247 .write(&tx_id, outputs)
248 .map_err(crate::reactor::SinkError::from)?;
249
250 self.stats.epoch_output_count += count;
251 self.stats.total_outputs += count;
252 Ok(())
253 }
254
255 fn flush(&mut self) -> Result<(), crate::reactor::SinkError> {
256 if let Some(tx_id) = self.current_tx.take() {
258 self.inner
259 .commit(&tx_id)
260 .map_err(|e| crate::reactor::SinkError::FlushFailed(e.to_string()))?;
261 self.stats.committed_epochs += 1;
262 self.stats.epoch_output_count = 0;
263 }
264 Ok(())
265 }
266}
267
268#[cfg(test)]
269#[allow(clippy::cast_sign_loss)]
270mod tests {
271 use super::*;
272 use crate::operator::Event;
273 use crate::reactor::{BufferingSink, Sink as ReactorSink};
274 use crate::sink::idempotent::IdempotentSink;
275 use crate::sink::transactional::TransactionalSink;
276 use crate::sink::InMemoryDedup;
277 use arrow_array::{Int64Array, RecordBatch};
278 use std::sync::Arc;
279
280 fn make_event(timestamp: i64, value: i64) -> Event {
281 let array = Arc::new(Int64Array::from(vec![value]));
282 let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
283 Event::new(timestamp, batch)
284 }
285
286 #[test]
287 fn test_new_adapter() {
288 let inner_sink = BufferingSink::new();
289 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
290 let adapter = ExactlyOnceSinkAdapter::new(tx_sink);
291
292 assert_eq!(adapter.epoch(), 0);
293 assert!(!adapter.has_active_transaction());
294 assert_eq!(adapter.stats().committed_epochs, 0);
295 }
296
297 #[test]
298 fn test_auto_begin_on_write() {
299 let inner_sink = BufferingSink::new();
300 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
301 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
302
303 let event = make_event(1000, 42);
304 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
305
306 assert!(adapter.has_active_transaction());
307 assert_eq!(adapter.stats().epoch_output_count, 1);
308 assert_eq!(adapter.stats().total_outputs, 1);
309 }
310
311 #[test]
312 fn test_checkpoint_commits_transaction() {
313 let inner_sink = BufferingSink::new();
314 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
315 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
316
317 let event = make_event(1000, 42);
319 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
320 assert!(adapter.has_active_transaction());
321
322 let checkpoint = adapter.notify_checkpoint(1).unwrap();
324 assert!(!adapter.has_active_transaction());
325 assert_eq!(adapter.epoch(), 1);
326 assert_eq!(checkpoint.epoch(), 1);
327 assert_eq!(adapter.stats().committed_epochs, 1);
328 }
329
330 #[test]
331 fn test_epoch_advances() {
332 let inner_sink = BufferingSink::new();
333 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
334 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
335
336 for epoch in 1..=3 {
337 let event = make_event(epoch * 1000, epoch);
338 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
339 adapter.notify_checkpoint(epoch as u64).unwrap();
340 }
341
342 assert_eq!(adapter.epoch(), 3);
343 assert_eq!(adapter.stats().committed_epochs, 3);
344 assert_eq!(adapter.stats().total_outputs, 3);
345 }
346
347 #[test]
348 fn test_restore_rollback() {
349 let inner_sink = BufferingSink::new();
350 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
351 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
352
353 let event = make_event(1000, 1);
355 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
356 let checkpoint = adapter.notify_checkpoint(1).unwrap();
357
358 let event2 = make_event(2000, 2);
360 ReactorSink::write(&mut adapter, vec![Output::Event(event2)]).unwrap();
361 assert!(adapter.has_active_transaction());
362
363 adapter.restore(&checkpoint).unwrap();
365 assert!(!adapter.has_active_transaction());
366 assert_eq!(adapter.epoch(), 1);
367 assert_eq!(adapter.stats().committed_epochs, 1);
368 }
369
370 #[test]
371 fn test_flush_commits() {
372 let inner_sink = BufferingSink::new();
373 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
374 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
375
376 let event = make_event(1000, 42);
377 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
378
379 ReactorSink::flush(&mut adapter).unwrap();
380 assert!(!adapter.has_active_transaction());
381 assert_eq!(adapter.stats().committed_epochs, 1);
382 }
383
384 #[test]
385 fn test_stats_tracking() {
386 let inner_sink = BufferingSink::new();
387 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
388 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
389
390 for i in 0..3 {
392 let event = make_event(i * 1000, i);
393 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
394 }
395
396 let stats = adapter.stats();
397 assert_eq!(stats.epoch_output_count, 3);
398 assert_eq!(stats.total_outputs, 3);
399 assert_eq!(stats.committed_epochs, 0);
400
401 adapter.notify_checkpoint(1).unwrap();
402 let stats = adapter.stats();
403 assert_eq!(stats.epoch_output_count, 0); assert_eq!(stats.total_outputs, 3);
405 assert_eq!(stats.committed_epochs, 1);
406 }
407
408 #[test]
409 fn test_compose_with_transactional_sink() {
410 let inner_sink = BufferingSink::new();
411 let tx_sink = TransactionalSink::new(inner_sink, "tx-sink");
412 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
413
414 let event = make_event(1000, 42);
415 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
416 let checkpoint = adapter.notify_checkpoint(1).unwrap();
417
418 assert_eq!(checkpoint.epoch(), 1);
419 assert_eq!(adapter.inner().stats().committed, 1);
420 }
421
422 #[test]
423 fn test_compose_with_idempotent_sink() {
424 let inner_sink = BufferingSink::new();
425 let dedup = InMemoryDedup::new(1000);
426 let idem_sink = IdempotentSink::new(inner_sink, dedup).with_sink_id("idem-sink");
427 let mut adapter = ExactlyOnceSinkAdapter::new(idem_sink);
428
429 let event = make_event(1000, 42);
430 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
431 let checkpoint = adapter.notify_checkpoint(1).unwrap();
432
433 assert_eq!(checkpoint.epoch(), 1);
434 assert_eq!(adapter.stats().committed_epochs, 1);
435 }
436
437 #[test]
438 fn test_multiple_writes_per_epoch() {
439 let inner_sink = BufferingSink::new();
440 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
441 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
442
443 for i in 0..5 {
445 let event = make_event(i * 100, i);
446 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
447 }
448
449 assert_eq!(adapter.stats().epoch_output_count, 5);
450
451 adapter.notify_checkpoint(1).unwrap();
452 assert_eq!(adapter.stats().total_outputs, 5);
453 assert_eq!(adapter.stats().committed_epochs, 1);
454 }
455
456 #[test]
457 fn test_empty_epoch_checkpoint() {
458 let inner_sink = BufferingSink::new();
459 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
460 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
461
462 let checkpoint = adapter.notify_checkpoint(1).unwrap();
464 assert_eq!(checkpoint.epoch(), 1);
465 assert_eq!(adapter.stats().committed_epochs, 0); }
467
468 #[test]
469 fn test_auto_begin_disabled() {
470 let inner_sink = BufferingSink::new();
471 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
472 let config = AdapterConfig {
473 auto_begin: false,
474 ..Default::default()
475 };
476 let mut adapter = ExactlyOnceSinkAdapter::with_config(tx_sink, config);
477
478 let event = make_event(1000, 42);
479 let result = ReactorSink::write(&mut adapter, vec![Output::Event(event)]);
480 assert!(result.is_err()); }
482
483 #[test]
484 fn test_checkpoint_state_without_commit() {
485 let inner_sink = BufferingSink::new();
486 let tx_sink = TransactionalSink::new(inner_sink, "test-sink");
487 let mut adapter = ExactlyOnceSinkAdapter::new(tx_sink);
488
489 let event = make_event(1000, 42);
491 ReactorSink::write(&mut adapter, vec![Output::Event(event)]).unwrap();
492
493 let cp = adapter.checkpoint();
495 assert!(adapter.has_active_transaction()); assert_eq!(cp.epoch(), 0); }
498}