rs2_stream/state/
traits.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4
5/// Types of state storage backends
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum StateStorageType {
8    InMemory,
9    Custom, // For user-defined storage backends
10}
11
12/// Trait for state storage backends (object-safe version)
13#[async_trait]
14pub trait StateStorage {
15    async fn get(&self, key: &str) -> Option<Vec<u8>>;
16    async fn set(
17        &self,
18        key: &str,
19        value: &[u8],
20    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
21    async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
22    async fn exists(&self, key: &str) -> bool;
23    async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
24}
25
26/// State management error types
27#[derive(Debug, thiserror::Error)]
28pub enum StateError {
29    #[error("Storage error: {0}")]
30    Storage(String),
31
32    #[error("Serialization error: {0}")]
33    Serialization(#[from] serde_json::Error),
34
35    #[error("Validation error: {0}")]
36    Validation(String),
37}
38
39pub type StateResult<T> = Result<T, StateError>;
40
41/// Helper trait for extracting keys from events
42pub trait KeyExtractor<T> {
43    fn extract_key(&self, event: &T) -> String;
44}
45
46/// Default key extractor that uses a field name
47pub struct FieldKeyExtractor {
48    field_name: String,
49}
50
51impl FieldKeyExtractor {
52    pub fn new(field_name: &str) -> Self {
53        Self {
54            field_name: field_name.to_string(),
55        }
56    }
57}
58
59impl<T> KeyExtractor<T> for FieldKeyExtractor 
60where T: Serialize {
61    fn extract_key(&self, event: &T) -> String {
62        match serde_json::to_value(event) {
63            Ok(value) => {
64                // Support nested field paths
65                let field_value = if self.field_name.contains('.') {
66                    self.extract_nested_field(&value)
67                } else {
68                    value.get(&self.field_name)
69                };
70                
71                match field_value {
72                    Some(Value::String(s)) => s.clone(),
73                    Some(Value::Number(n)) => n.to_string(),
74                    Some(Value::Bool(b)) => b.to_string(),
75                    Some(Value::Null) => "null".to_string(),
76                    Some(Value::Array(_) | Value::Object(_)) => {
77                        serde_json::to_string(field_value.unwrap())
78                            .unwrap_or_else(|_| "invalid_complex_type".to_string())
79                    }
80                    None => format!("missing_field_{}", self.field_name),
81                }
82            }
83            Err(e) => {
84                format!("serialization_error_{}_{}", 
85                    self.field_name, 
86                    e.to_string().chars().take(10).collect::<String>())
87            }
88        }
89    }
90}
91
92impl FieldKeyExtractor {
93    /// Extract a nested field value using dot notation (e.g., "user.profile.id")
94    fn extract_nested_field<'a>(&self, value: &'a Value) -> Option<&'a Value> {
95        let parts: Vec<&str> = self.field_name.split('.').collect();
96        let mut current = value;
97
98        for part in parts {
99            current = current.get(part)?;
100        }
101
102        Some(current)
103    }
104
105}
106
107/// Custom key extractor function
108#[derive(Clone)]
109pub struct CustomKeyExtractor<F> {
110    extractor: F,
111}
112
113impl<F> CustomKeyExtractor<F> {
114    pub fn new(extractor: F) -> Self {
115        Self { extractor }
116    }
117}
118
119impl<T, F> KeyExtractor<T> for CustomKeyExtractor<F>
120where
121    F: Fn(&T) -> String + Clone,
122{
123    fn extract_key(&self, event: &T) -> String {
124        (self.extractor)(event)
125    }
126}