Skip to main content

laminar_core/sink/
traits.rs

1//! Traits for exactly-once sink semantics
2
3#![allow(clippy::cast_possible_truncation)]
4
5use super::checkpoint::SinkCheckpoint;
6use super::error::SinkError;
7use crate::operator::Output;
8
9/// Unique identifier for a transaction
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub struct TransactionId {
12    /// The transaction ID value
13    id: u64,
14    /// Optional string identifier for external systems
15    external_id: Option<String>,
16}
17
18impl TransactionId {
19    /// Create a new transaction ID from a numeric value
20    #[must_use]
21    pub fn new(id: u64) -> Self {
22        Self {
23            id,
24            external_id: None,
25        }
26    }
27
28    /// Create a transaction ID with an external identifier
29    #[must_use]
30    pub fn with_external(id: u64, external_id: String) -> Self {
31        Self {
32            id,
33            external_id: Some(external_id),
34        }
35    }
36
37    /// Get the numeric ID
38    #[must_use]
39    pub fn id(&self) -> u64 {
40        self.id
41    }
42
43    /// Get the external ID if present
44    #[must_use]
45    pub fn external_id(&self) -> Option<&str> {
46        self.external_id.as_deref()
47    }
48
49    /// Serialize to bytes for checkpointing
50    #[must_use]
51    pub fn to_bytes(&self) -> Vec<u8> {
52        let mut bytes = self.id.to_le_bytes().to_vec();
53        if let Some(ref ext) = self.external_id {
54            bytes.extend_from_slice(&(ext.len() as u32).to_le_bytes());
55            bytes.extend_from_slice(ext.as_bytes());
56        }
57        bytes
58    }
59
60    /// Deserialize from bytes
61    #[must_use]
62    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
63        if bytes.len() < 8 {
64            return None;
65        }
66        let id = u64::from_le_bytes(bytes[0..8].try_into().ok()?);
67        let external_id = if bytes.len() > 12 {
68            let len = u32::from_le_bytes(bytes[8..12].try_into().ok()?) as usize;
69            if bytes.len() >= 12 + len {
70                Some(String::from_utf8_lossy(&bytes[12..12 + len]).to_string())
71            } else {
72                None
73            }
74        } else {
75            None
76        };
77        Some(Self { id, external_id })
78    }
79}
80
81impl std::fmt::Display for TransactionId {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        if let Some(ref ext) = self.external_id {
84            write!(f, "tx-{}-{}", self.id, ext)
85        } else {
86            write!(f, "tx-{}", self.id)
87        }
88    }
89}
90
91/// Capabilities that a sink may support
92#[derive(Debug, Clone, Default)]
93#[allow(clippy::struct_excessive_bools)]
94pub struct SinkCapabilities {
95    /// Supports transactional writes
96    transactions: bool,
97    /// Supports idempotent writes (safe to retry)
98    idempotent_writes: bool,
99    /// Supports upsert semantics (update or insert)
100    upsert: bool,
101    /// Supports changelog/CDC records
102    changelog: bool,
103    /// Supports two-phase commit
104    two_phase_commit: bool,
105    /// Supports partitioned writes
106    partitioned: bool,
107}
108
109impl SinkCapabilities {
110    /// Create new capabilities with all disabled
111    #[must_use]
112    pub fn new() -> Self {
113        Self::default()
114    }
115
116    /// Enable transaction support
117    #[must_use]
118    pub fn with_transactions(mut self) -> Self {
119        self.transactions = true;
120        self
121    }
122
123    /// Enable idempotent write support
124    #[must_use]
125    pub fn with_idempotent_writes(mut self) -> Self {
126        self.idempotent_writes = true;
127        self
128    }
129
130    /// Enable upsert support
131    #[must_use]
132    pub fn with_upsert(mut self) -> Self {
133        self.upsert = true;
134        self
135    }
136
137    /// Enable changelog support
138    #[must_use]
139    pub fn with_changelog_support(mut self) -> Self {
140        self.changelog = true;
141        self
142    }
143
144    /// Enable two-phase commit support
145    #[must_use]
146    pub fn with_two_phase_commit(mut self) -> Self {
147        self.two_phase_commit = true;
148        self
149    }
150
151    /// Enable partitioned write support
152    #[must_use]
153    pub fn with_partitioned(mut self) -> Self {
154        self.partitioned = true;
155        self
156    }
157
158    /// Check if transactions are supported
159    #[must_use]
160    pub fn supports_transactions(&self) -> bool {
161        self.transactions
162    }
163
164    /// Check if idempotent writes are supported
165    #[must_use]
166    pub fn supports_idempotent_writes(&self) -> bool {
167        self.idempotent_writes
168    }
169
170    /// Check if upsert is supported
171    #[must_use]
172    pub fn supports_upsert(&self) -> bool {
173        self.upsert
174    }
175
176    /// Check if changelog is supported
177    #[must_use]
178    pub fn supports_changelog(&self) -> bool {
179        self.changelog
180    }
181
182    /// Check if two-phase commit is supported
183    #[must_use]
184    pub fn supports_two_phase_commit(&self) -> bool {
185        self.two_phase_commit
186    }
187
188    /// Check if partitioned writes are supported
189    #[must_use]
190    pub fn supports_partitioned(&self) -> bool {
191        self.partitioned
192    }
193}
194
195/// Current state of a sink
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum SinkState {
198    /// Sink is idle, ready to accept writes
199    Idle,
200    /// Sink has an active transaction
201    InTransaction,
202    /// Sink is preparing to commit (2PC phase 1)
203    Preparing,
204    /// Sink is committing
205    Committing,
206    /// Sink encountered an error
207    Error,
208    /// Sink is closed
209    Closed,
210}
211
212impl SinkState {
213    /// Check if the sink can accept writes
214    #[must_use]
215    pub fn can_write(&self) -> bool {
216        matches!(self, Self::Idle | Self::InTransaction)
217    }
218
219    /// Check if the sink can begin a transaction
220    #[must_use]
221    pub fn can_begin_transaction(&self) -> bool {
222        matches!(self, Self::Idle)
223    }
224
225    /// Check if the sink can commit
226    #[must_use]
227    pub fn can_commit(&self) -> bool {
228        matches!(self, Self::InTransaction | Self::Preparing)
229    }
230}
231
232/// Trait for sinks that support exactly-once delivery semantics.
233///
234/// This trait extends the basic `Sink` trait with transaction support,
235/// enabling exactly-once delivery through atomic commits.
236///
237/// # Transaction Flow
238///
239/// ```text
240/// begin_transaction() ──► write() ──► write() ──► commit()
241///                              │              │
242///                              ▼              ▼
243///                         rollback() on failure
244/// ```
245///
246/// # Recovery
247///
248/// On recovery:
249/// 1. Load `SinkCheckpoint` from checkpoint
250/// 2. If pending transaction exists, call `rollback()`
251/// 3. Resume from last committed offset
252pub trait ExactlyOnceSink: Send {
253    /// Get the sink's capabilities
254    fn capabilities(&self) -> SinkCapabilities;
255
256    /// Get the current state of the sink
257    fn state(&self) -> SinkState;
258
259    /// Begin a new transaction.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if a transaction is already active or if the
264    /// sink cannot begin a transaction.
265    fn begin_transaction(&mut self) -> Result<TransactionId, SinkError>;
266
267    /// Write outputs within the current transaction.
268    ///
269    /// Writes are not visible until `commit()` is called.
270    ///
271    /// # Arguments
272    ///
273    /// * `tx_id` - The transaction ID returned by `begin_transaction()`
274    /// * `outputs` - The outputs to write
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the transaction ID is invalid or if the
279    /// write operation fails.
280    fn write(&mut self, tx_id: &TransactionId, outputs: Vec<Output>) -> Result<(), SinkError>;
281
282    /// Commit the current transaction.
283    ///
284    /// After commit, all writes become visible and durable.
285    ///
286    /// # Arguments
287    ///
288    /// * `tx_id` - The transaction ID to commit
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the transaction ID is invalid or if the
293    /// commit operation fails.
294    fn commit(&mut self, tx_id: &TransactionId) -> Result<(), SinkError>;
295
296    /// Rollback the current transaction.
297    ///
298    /// Discards all writes made in the transaction.
299    ///
300    /// # Arguments
301    ///
302    /// * `tx_id` - The transaction ID to rollback
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the transaction ID is invalid or if the
307    /// rollback operation fails.
308    fn rollback(&mut self, tx_id: &TransactionId) -> Result<(), SinkError>;
309
310    /// Get the current checkpoint state for persistence.
311    ///
312    /// This should be called during checkpointing to capture the
313    /// sink's state for recovery.
314    fn checkpoint(&self) -> SinkCheckpoint;
315
316    /// Restore the sink from a checkpoint.
317    ///
318    /// Called during recovery to restore the sink's state.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if the checkpoint is invalid or if the
323    /// sink cannot be restored.
324    fn restore(&mut self, checkpoint: &SinkCheckpoint) -> Result<(), SinkError>;
325
326    /// Flush any buffered data.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the flush operation fails.
331    fn flush(&mut self) -> Result<(), SinkError>;
332
333    /// Close the sink and release resources.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if the sink cannot be closed cleanly.
338    fn close(&mut self) -> Result<(), SinkError>;
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn test_transaction_id_new() {
347        let tx = TransactionId::new(123);
348        assert_eq!(tx.id(), 123);
349        assert_eq!(tx.external_id(), None);
350        assert_eq!(tx.to_string(), "tx-123");
351    }
352
353    #[test]
354    fn test_transaction_id_with_external() {
355        let tx = TransactionId::with_external(456, "kafka-tx-abc".to_string());
356        assert_eq!(tx.id(), 456);
357        assert_eq!(tx.external_id(), Some("kafka-tx-abc"));
358        assert!(tx.to_string().contains("kafka-tx-abc"));
359    }
360
361    #[test]
362    fn test_transaction_id_serialization() {
363        let tx = TransactionId::with_external(789, "test".to_string());
364        let bytes = tx.to_bytes();
365        let restored = TransactionId::from_bytes(&bytes).unwrap();
366        assert_eq!(tx, restored);
367    }
368
369    #[test]
370    fn test_sink_capabilities() {
371        let caps = SinkCapabilities::new();
372        assert!(!caps.supports_transactions());
373        assert!(!caps.supports_upsert());
374
375        let caps = SinkCapabilities::new().with_transactions().with_upsert();
376        assert!(caps.supports_transactions());
377        assert!(caps.supports_upsert());
378        assert!(!caps.supports_changelog());
379    }
380
381    #[test]
382    fn test_sink_state_transitions() {
383        assert!(SinkState::Idle.can_begin_transaction());
384        assert!(SinkState::Idle.can_write());
385        assert!(!SinkState::Idle.can_commit());
386
387        assert!(!SinkState::InTransaction.can_begin_transaction());
388        assert!(SinkState::InTransaction.can_write());
389        assert!(SinkState::InTransaction.can_commit());
390
391        assert!(!SinkState::Error.can_write());
392        assert!(!SinkState::Closed.can_write());
393    }
394}