kstone_core/
stream.rs

1/// Stream support for Change Data Capture (Phase 3.4)
2///
3/// Provides DynamoDB-style streams that capture item-level modifications
4/// (INSERT, MODIFY, REMOVE) with configurable view types.
5
6use crate::{Key, Item};
7use serde::{Deserialize, Serialize};
8
9/// Stream view type - controls what data is included in stream records
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11pub enum StreamViewType {
12    /// Only the key attributes of the item
13    KeysOnly,
14    /// The entire item as it appears after modification
15    NewImage,
16    /// The entire item as it appeared before modification
17    OldImage,
18    /// Both the new and old images
19    NewAndOldImages,
20}
21
22impl Default for StreamViewType {
23    fn default() -> Self {
24        StreamViewType::NewAndOldImages
25    }
26}
27
28/// Stream event type
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30pub enum StreamEventType {
31    /// A new item was added to the table
32    Insert,
33    /// An existing item was updated
34    Modify,
35    /// An item was deleted from the table
36    Remove,
37}
38
39/// A stream record capturing a single item change
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct StreamRecord {
42    /// Sequence number (globally unique, monotonically increasing)
43    pub sequence_number: u64,
44    /// Type of change event
45    pub event_type: StreamEventType,
46    /// The key of the modified item
47    pub key: Key,
48    /// The item before modification (if applicable and view type includes it)
49    pub old_image: Option<Item>,
50    /// The item after modification (if applicable and view type includes it)
51    pub new_image: Option<Item>,
52    /// Approximate timestamp when the change occurred (milliseconds since epoch)
53    pub timestamp: i64,
54}
55
56impl StreamRecord {
57    /// Create an INSERT record
58    pub fn insert(sequence_number: u64, key: Key, new_image: Item, view_type: StreamViewType) -> Self {
59        let (old, new) = match view_type {
60            StreamViewType::KeysOnly => (None, None),
61            StreamViewType::NewImage | StreamViewType::NewAndOldImages => (None, Some(new_image)),
62            StreamViewType::OldImage => (None, None), // No old image for insert
63        };
64
65        Self {
66            sequence_number,
67            event_type: StreamEventType::Insert,
68            key,
69            old_image: old,
70            new_image: new,
71            timestamp: current_timestamp_millis(),
72        }
73    }
74
75    /// Create a MODIFY record
76    pub fn modify(
77        sequence_number: u64,
78        key: Key,
79        old_image: Item,
80        new_image: Item,
81        view_type: StreamViewType,
82    ) -> Self {
83        let (old, new) = match view_type {
84            StreamViewType::KeysOnly => (None, None),
85            StreamViewType::NewImage => (None, Some(new_image)),
86            StreamViewType::OldImage => (Some(old_image), None),
87            StreamViewType::NewAndOldImages => (Some(old_image), Some(new_image)),
88        };
89
90        Self {
91            sequence_number,
92            event_type: StreamEventType::Modify,
93            key,
94            old_image: old,
95            new_image: new,
96            timestamp: current_timestamp_millis(),
97        }
98    }
99
100    /// Create a REMOVE record
101    pub fn remove(sequence_number: u64, key: Key, old_image: Item, view_type: StreamViewType) -> Self {
102        let (old, new) = match view_type {
103            StreamViewType::KeysOnly => (None, None),
104            StreamViewType::OldImage | StreamViewType::NewAndOldImages => (Some(old_image), None),
105            StreamViewType::NewImage => (None, None), // No new image for remove
106        };
107
108        Self {
109            sequence_number,
110            event_type: StreamEventType::Remove,
111            key,
112            old_image: old,
113            new_image: new,
114            timestamp: current_timestamp_millis(),
115        }
116    }
117}
118
119/// Stream configuration
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct StreamConfig {
122    /// Whether streams are enabled
123    pub enabled: bool,
124    /// What data to include in stream records
125    pub view_type: StreamViewType,
126    /// Maximum number of records to retain in the stream buffer
127    /// (oldest records are dropped when buffer is full)
128    pub buffer_size: usize,
129}
130
131impl Default for StreamConfig {
132    fn default() -> Self {
133        Self {
134            enabled: false,
135            view_type: StreamViewType::NewAndOldImages,
136            buffer_size: 1000,
137        }
138    }
139}
140
141impl StreamConfig {
142    /// Create a new stream config with streams enabled
143    pub fn enabled() -> Self {
144        Self {
145            enabled: true,
146            ..Default::default()
147        }
148    }
149
150    /// Set the view type
151    pub fn with_view_type(mut self, view_type: StreamViewType) -> Self {
152        self.view_type = view_type;
153        self
154    }
155
156    /// Set the buffer size
157    pub fn with_buffer_size(mut self, size: usize) -> Self {
158        self.buffer_size = size;
159        self
160    }
161}
162
163/// Get current timestamp in milliseconds since epoch
164fn current_timestamp_millis() -> i64 {
165    std::time::SystemTime::now()
166        .duration_since(std::time::UNIX_EPOCH)
167        .unwrap()
168        .as_millis() as i64
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::Value;
175    use std::collections::HashMap;
176
177    #[test]
178    fn test_stream_view_type_default() {
179        assert_eq!(StreamViewType::default(), StreamViewType::NewAndOldImages);
180    }
181
182    #[test]
183    fn test_stream_config_default() {
184        let config = StreamConfig::default();
185        assert_eq!(config.enabled, false);
186        assert_eq!(config.view_type, StreamViewType::NewAndOldImages);
187        assert_eq!(config.buffer_size, 1000);
188    }
189
190    #[test]
191    fn test_stream_config_enabled() {
192        let config = StreamConfig::enabled()
193            .with_view_type(StreamViewType::KeysOnly)
194            .with_buffer_size(500);
195
196        assert_eq!(config.enabled, true);
197        assert_eq!(config.view_type, StreamViewType::KeysOnly);
198        assert_eq!(config.buffer_size, 500);
199    }
200
201    #[test]
202    fn test_stream_record_insert() {
203        let key = Key::new(b"user#123".to_vec());
204        let mut item = HashMap::new();
205        item.insert("name".to_string(), Value::string("Alice"));
206
207        let record = StreamRecord::insert(1, key.clone(), item.clone(), StreamViewType::NewImage);
208
209        assert_eq!(record.sequence_number, 1);
210        assert_eq!(record.event_type, StreamEventType::Insert);
211        assert_eq!(record.key, key);
212        assert!(record.old_image.is_none());
213        assert!(record.new_image.is_some());
214    }
215
216    #[test]
217    fn test_stream_record_modify() {
218        let key = Key::new(b"user#123".to_vec());
219        let mut old_item = HashMap::new();
220        old_item.insert("name".to_string(), Value::string("Alice"));
221        let mut new_item = HashMap::new();
222        new_item.insert("name".to_string(), Value::string("Bob"));
223
224        let record = StreamRecord::modify(
225            2,
226            key.clone(),
227            old_item.clone(),
228            new_item.clone(),
229            StreamViewType::NewAndOldImages,
230        );
231
232        assert_eq!(record.sequence_number, 2);
233        assert_eq!(record.event_type, StreamEventType::Modify);
234        assert!(record.old_image.is_some());
235        assert!(record.new_image.is_some());
236    }
237
238    #[test]
239    fn test_stream_record_remove() {
240        let key = Key::new(b"user#123".to_vec());
241        let mut item = HashMap::new();
242        item.insert("name".to_string(), Value::string("Alice"));
243
244        let record = StreamRecord::remove(3, key.clone(), item.clone(), StreamViewType::OldImage);
245
246        assert_eq!(record.sequence_number, 3);
247        assert_eq!(record.event_type, StreamEventType::Remove);
248        assert!(record.old_image.is_some());
249        assert!(record.new_image.is_none());
250    }
251
252    #[test]
253    fn test_stream_record_keys_only() {
254        let key = Key::new(b"user#123".to_vec());
255        let mut item = HashMap::new();
256        item.insert("name".to_string(), Value::string("Alice"));
257
258        let record = StreamRecord::insert(1, key.clone(), item.clone(), StreamViewType::KeysOnly);
259
260        assert!(record.old_image.is_none());
261        assert!(record.new_image.is_none());
262    }
263}