laminar_core/sink/
traits.rs1#![allow(clippy::cast_possible_truncation)]
4
5use super::checkpoint::SinkCheckpoint;
6use super::error::SinkError;
7use crate::operator::Output;
8
9#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub struct TransactionId {
12 id: u64,
14 external_id: Option<String>,
16}
17
18impl TransactionId {
19 #[must_use]
21 pub fn new(id: u64) -> Self {
22 Self {
23 id,
24 external_id: None,
25 }
26 }
27
28 #[must_use]
30 pub fn with_external(id: u64, external_id: String) -> Self {
31 Self {
32 id,
33 external_id: Some(external_id),
34 }
35 }
36
37 #[must_use]
39 pub fn id(&self) -> u64 {
40 self.id
41 }
42
43 #[must_use]
45 pub fn external_id(&self) -> Option<&str> {
46 self.external_id.as_deref()
47 }
48
49 #[must_use]
51 pub fn to_bytes(&self) -> Vec<u8> {
52 let mut bytes = self.id.to_le_bytes().to_vec();
53 if let Some(ref ext) = self.external_id {
54 bytes.extend_from_slice(&(ext.len() as u32).to_le_bytes());
55 bytes.extend_from_slice(ext.as_bytes());
56 }
57 bytes
58 }
59
60 #[must_use]
62 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
63 if bytes.len() < 8 {
64 return None;
65 }
66 let id = u64::from_le_bytes(bytes[0..8].try_into().ok()?);
67 let external_id = if bytes.len() > 12 {
68 let len = u32::from_le_bytes(bytes[8..12].try_into().ok()?) as usize;
69 if bytes.len() >= 12 + len {
70 Some(String::from_utf8_lossy(&bytes[12..12 + len]).to_string())
71 } else {
72 None
73 }
74 } else {
75 None
76 };
77 Some(Self { id, external_id })
78 }
79}
80
81impl std::fmt::Display for TransactionId {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 if let Some(ref ext) = self.external_id {
84 write!(f, "tx-{}-{}", self.id, ext)
85 } else {
86 write!(f, "tx-{}", self.id)
87 }
88 }
89}
90
91#[derive(Debug, Clone, Default)]
93#[allow(clippy::struct_excessive_bools)]
94pub struct SinkCapabilities {
95 transactions: bool,
97 idempotent_writes: bool,
99 upsert: bool,
101 changelog: bool,
103 two_phase_commit: bool,
105 partitioned: bool,
107}
108
109impl SinkCapabilities {
110 #[must_use]
112 pub fn new() -> Self {
113 Self::default()
114 }
115
116 #[must_use]
118 pub fn with_transactions(mut self) -> Self {
119 self.transactions = true;
120 self
121 }
122
123 #[must_use]
125 pub fn with_idempotent_writes(mut self) -> Self {
126 self.idempotent_writes = true;
127 self
128 }
129
130 #[must_use]
132 pub fn with_upsert(mut self) -> Self {
133 self.upsert = true;
134 self
135 }
136
137 #[must_use]
139 pub fn with_changelog_support(mut self) -> Self {
140 self.changelog = true;
141 self
142 }
143
144 #[must_use]
146 pub fn with_two_phase_commit(mut self) -> Self {
147 self.two_phase_commit = true;
148 self
149 }
150
151 #[must_use]
153 pub fn with_partitioned(mut self) -> Self {
154 self.partitioned = true;
155 self
156 }
157
158 #[must_use]
160 pub fn supports_transactions(&self) -> bool {
161 self.transactions
162 }
163
164 #[must_use]
166 pub fn supports_idempotent_writes(&self) -> bool {
167 self.idempotent_writes
168 }
169
170 #[must_use]
172 pub fn supports_upsert(&self) -> bool {
173 self.upsert
174 }
175
176 #[must_use]
178 pub fn supports_changelog(&self) -> bool {
179 self.changelog
180 }
181
182 #[must_use]
184 pub fn supports_two_phase_commit(&self) -> bool {
185 self.two_phase_commit
186 }
187
188 #[must_use]
190 pub fn supports_partitioned(&self) -> bool {
191 self.partitioned
192 }
193}
194
195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum SinkState {
198 Idle,
200 InTransaction,
202 Preparing,
204 Committing,
206 Error,
208 Closed,
210}
211
212impl SinkState {
213 #[must_use]
215 pub fn can_write(&self) -> bool {
216 matches!(self, Self::Idle | Self::InTransaction)
217 }
218
219 #[must_use]
221 pub fn can_begin_transaction(&self) -> bool {
222 matches!(self, Self::Idle)
223 }
224
225 #[must_use]
227 pub fn can_commit(&self) -> bool {
228 matches!(self, Self::InTransaction | Self::Preparing)
229 }
230}
231
232pub trait ExactlyOnceSink: Send {
253 fn capabilities(&self) -> SinkCapabilities;
255
256 fn state(&self) -> SinkState;
258
259 fn begin_transaction(&mut self) -> Result<TransactionId, SinkError>;
266
267 fn write(&mut self, tx_id: &TransactionId, outputs: Vec<Output>) -> Result<(), SinkError>;
281
282 fn commit(&mut self, tx_id: &TransactionId) -> Result<(), SinkError>;
295
296 fn rollback(&mut self, tx_id: &TransactionId) -> Result<(), SinkError>;
309
310 fn checkpoint(&self) -> SinkCheckpoint;
315
316 fn restore(&mut self, checkpoint: &SinkCheckpoint) -> Result<(), SinkError>;
325
326 fn flush(&mut self) -> Result<(), SinkError>;
332
333 fn close(&mut self) -> Result<(), SinkError>;
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
346 fn test_transaction_id_new() {
347 let tx = TransactionId::new(123);
348 assert_eq!(tx.id(), 123);
349 assert_eq!(tx.external_id(), None);
350 assert_eq!(tx.to_string(), "tx-123");
351 }
352
353 #[test]
354 fn test_transaction_id_with_external() {
355 let tx = TransactionId::with_external(456, "kafka-tx-abc".to_string());
356 assert_eq!(tx.id(), 456);
357 assert_eq!(tx.external_id(), Some("kafka-tx-abc"));
358 assert!(tx.to_string().contains("kafka-tx-abc"));
359 }
360
361 #[test]
362 fn test_transaction_id_serialization() {
363 let tx = TransactionId::with_external(789, "test".to_string());
364 let bytes = tx.to_bytes();
365 let restored = TransactionId::from_bytes(&bytes).unwrap();
366 assert_eq!(tx, restored);
367 }
368
369 #[test]
370 fn test_sink_capabilities() {
371 let caps = SinkCapabilities::new();
372 assert!(!caps.supports_transactions());
373 assert!(!caps.supports_upsert());
374
375 let caps = SinkCapabilities::new().with_transactions().with_upsert();
376 assert!(caps.supports_transactions());
377 assert!(caps.supports_upsert());
378 assert!(!caps.supports_changelog());
379 }
380
381 #[test]
382 fn test_sink_state_transitions() {
383 assert!(SinkState::Idle.can_begin_transaction());
384 assert!(SinkState::Idle.can_write());
385 assert!(!SinkState::Idle.can_commit());
386
387 assert!(!SinkState::InTransaction.can_begin_transaction());
388 assert!(SinkState::InTransaction.can_write());
389 assert!(SinkState::InTransaction.can_commit());
390
391 assert!(!SinkState::Error.can_write());
392 assert!(!SinkState::Closed.can_write());
393 }
394}