1use crate::{Key, Item};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11pub enum StreamViewType {
12 KeysOnly,
14 NewImage,
16 OldImage,
18 NewAndOldImages,
20}
21
22impl Default for StreamViewType {
23 fn default() -> Self {
24 StreamViewType::NewAndOldImages
25 }
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30pub enum StreamEventType {
31 Insert,
33 Modify,
35 Remove,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct StreamRecord {
42 pub sequence_number: u64,
44 pub event_type: StreamEventType,
46 pub key: Key,
48 pub old_image: Option<Item>,
50 pub new_image: Option<Item>,
52 pub timestamp: i64,
54}
55
56impl StreamRecord {
57 pub fn insert(sequence_number: u64, key: Key, new_image: Item, view_type: StreamViewType) -> Self {
59 let (old, new) = match view_type {
60 StreamViewType::KeysOnly => (None, None),
61 StreamViewType::NewImage | StreamViewType::NewAndOldImages => (None, Some(new_image)),
62 StreamViewType::OldImage => (None, None), };
64
65 Self {
66 sequence_number,
67 event_type: StreamEventType::Insert,
68 key,
69 old_image: old,
70 new_image: new,
71 timestamp: current_timestamp_millis(),
72 }
73 }
74
75 pub fn modify(
77 sequence_number: u64,
78 key: Key,
79 old_image: Item,
80 new_image: Item,
81 view_type: StreamViewType,
82 ) -> Self {
83 let (old, new) = match view_type {
84 StreamViewType::KeysOnly => (None, None),
85 StreamViewType::NewImage => (None, Some(new_image)),
86 StreamViewType::OldImage => (Some(old_image), None),
87 StreamViewType::NewAndOldImages => (Some(old_image), Some(new_image)),
88 };
89
90 Self {
91 sequence_number,
92 event_type: StreamEventType::Modify,
93 key,
94 old_image: old,
95 new_image: new,
96 timestamp: current_timestamp_millis(),
97 }
98 }
99
100 pub fn remove(sequence_number: u64, key: Key, old_image: Item, view_type: StreamViewType) -> Self {
102 let (old, new) = match view_type {
103 StreamViewType::KeysOnly => (None, None),
104 StreamViewType::OldImage | StreamViewType::NewAndOldImages => (Some(old_image), None),
105 StreamViewType::NewImage => (None, None), };
107
108 Self {
109 sequence_number,
110 event_type: StreamEventType::Remove,
111 key,
112 old_image: old,
113 new_image: new,
114 timestamp: current_timestamp_millis(),
115 }
116 }
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct StreamConfig {
122 pub enabled: bool,
124 pub view_type: StreamViewType,
126 pub buffer_size: usize,
129}
130
131impl Default for StreamConfig {
132 fn default() -> Self {
133 Self {
134 enabled: false,
135 view_type: StreamViewType::NewAndOldImages,
136 buffer_size: 1000,
137 }
138 }
139}
140
141impl StreamConfig {
142 pub fn enabled() -> Self {
144 Self {
145 enabled: true,
146 ..Default::default()
147 }
148 }
149
150 pub fn with_view_type(mut self, view_type: StreamViewType) -> Self {
152 self.view_type = view_type;
153 self
154 }
155
156 pub fn with_buffer_size(mut self, size: usize) -> Self {
158 self.buffer_size = size;
159 self
160 }
161}
162
163fn current_timestamp_millis() -> i64 {
165 std::time::SystemTime::now()
166 .duration_since(std::time::UNIX_EPOCH)
167 .unwrap()
168 .as_millis() as i64
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::Value;
175 use std::collections::HashMap;
176
177 #[test]
178 fn test_stream_view_type_default() {
179 assert_eq!(StreamViewType::default(), StreamViewType::NewAndOldImages);
180 }
181
182 #[test]
183 fn test_stream_config_default() {
184 let config = StreamConfig::default();
185 assert_eq!(config.enabled, false);
186 assert_eq!(config.view_type, StreamViewType::NewAndOldImages);
187 assert_eq!(config.buffer_size, 1000);
188 }
189
190 #[test]
191 fn test_stream_config_enabled() {
192 let config = StreamConfig::enabled()
193 .with_view_type(StreamViewType::KeysOnly)
194 .with_buffer_size(500);
195
196 assert_eq!(config.enabled, true);
197 assert_eq!(config.view_type, StreamViewType::KeysOnly);
198 assert_eq!(config.buffer_size, 500);
199 }
200
201 #[test]
202 fn test_stream_record_insert() {
203 let key = Key::new(b"user#123".to_vec());
204 let mut item = HashMap::new();
205 item.insert("name".to_string(), Value::string("Alice"));
206
207 let record = StreamRecord::insert(1, key.clone(), item.clone(), StreamViewType::NewImage);
208
209 assert_eq!(record.sequence_number, 1);
210 assert_eq!(record.event_type, StreamEventType::Insert);
211 assert_eq!(record.key, key);
212 assert!(record.old_image.is_none());
213 assert!(record.new_image.is_some());
214 }
215
216 #[test]
217 fn test_stream_record_modify() {
218 let key = Key::new(b"user#123".to_vec());
219 let mut old_item = HashMap::new();
220 old_item.insert("name".to_string(), Value::string("Alice"));
221 let mut new_item = HashMap::new();
222 new_item.insert("name".to_string(), Value::string("Bob"));
223
224 let record = StreamRecord::modify(
225 2,
226 key.clone(),
227 old_item.clone(),
228 new_item.clone(),
229 StreamViewType::NewAndOldImages,
230 );
231
232 assert_eq!(record.sequence_number, 2);
233 assert_eq!(record.event_type, StreamEventType::Modify);
234 assert!(record.old_image.is_some());
235 assert!(record.new_image.is_some());
236 }
237
238 #[test]
239 fn test_stream_record_remove() {
240 let key = Key::new(b"user#123".to_vec());
241 let mut item = HashMap::new();
242 item.insert("name".to_string(), Value::string("Alice"));
243
244 let record = StreamRecord::remove(3, key.clone(), item.clone(), StreamViewType::OldImage);
245
246 assert_eq!(record.sequence_number, 3);
247 assert_eq!(record.event_type, StreamEventType::Remove);
248 assert!(record.old_image.is_some());
249 assert!(record.new_image.is_none());
250 }
251
252 #[test]
253 fn test_stream_record_keys_only() {
254 let key = Key::new(b"user#123".to_vec());
255 let mut item = HashMap::new();
256 item.insert("name".to_string(), Value::string("Alice"));
257
258 let record = StreamRecord::insert(1, key.clone(), item.clone(), StreamViewType::KeysOnly);
259
260 assert!(record.old_image.is_none());
261 assert!(record.new_image.is_none());
262 }
263}