rs2_stream/state/
traits.rs1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum StateStorageType {
8 InMemory,
9 Custom, }
11
12#[async_trait]
14pub trait StateStorage {
15 async fn get(&self, key: &str) -> Option<Vec<u8>>;
16 async fn set(
17 &self,
18 key: &str,
19 value: &[u8],
20 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
21 async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
22 async fn exists(&self, key: &str) -> bool;
23 async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
24}
25
26#[derive(Debug, thiserror::Error)]
28pub enum StateError {
29 #[error("Storage error: {0}")]
30 Storage(String),
31
32 #[error("Serialization error: {0}")]
33 Serialization(#[from] serde_json::Error),
34
35 #[error("Validation error: {0}")]
36 Validation(String),
37}
38
39pub type StateResult<T> = Result<T, StateError>;
40
41pub trait KeyExtractor<T> {
43 fn extract_key(&self, event: &T) -> String;
44}
45
46pub struct FieldKeyExtractor {
48 field_name: String,
49}
50
51impl FieldKeyExtractor {
52 pub fn new(field_name: &str) -> Self {
53 Self {
54 field_name: field_name.to_string(),
55 }
56 }
57}
58
59impl<T> KeyExtractor<T> for FieldKeyExtractor
60where T: Serialize {
61 fn extract_key(&self, event: &T) -> String {
62 match serde_json::to_value(event) {
63 Ok(value) => {
64 let field_value = if self.field_name.contains('.') {
66 self.extract_nested_field(&value)
67 } else {
68 value.get(&self.field_name)
69 };
70
71 match field_value {
72 Some(Value::String(s)) => s.clone(),
73 Some(Value::Number(n)) => n.to_string(),
74 Some(Value::Bool(b)) => b.to_string(),
75 Some(Value::Null) => "null".to_string(),
76 Some(Value::Array(_) | Value::Object(_)) => {
77 serde_json::to_string(field_value.unwrap())
78 .unwrap_or_else(|_| "invalid_complex_type".to_string())
79 }
80 None => format!("missing_field_{}", self.field_name),
81 }
82 }
83 Err(e) => {
84 format!("serialization_error_{}_{}",
85 self.field_name,
86 e.to_string().chars().take(10).collect::<String>())
87 }
88 }
89 }
90}
91
92impl FieldKeyExtractor {
93 fn extract_nested_field<'a>(&self, value: &'a Value) -> Option<&'a Value> {
95 let parts: Vec<&str> = self.field_name.split('.').collect();
96 let mut current = value;
97
98 for part in parts {
99 current = current.get(part)?;
100 }
101
102 Some(current)
103 }
104
105}
106
107#[derive(Clone)]
109pub struct CustomKeyExtractor<F> {
110 extractor: F,
111}
112
113impl<F> CustomKeyExtractor<F> {
114 pub fn new(extractor: F) -> Self {
115 Self { extractor }
116 }
117}
118
119impl<T, F> KeyExtractor<T> for CustomKeyExtractor<F>
120where
121 F: Fn(&T) -> String + Clone,
122{
123 fn extract_key(&self, event: &T) -> String {
124 (self.extractor)(event)
125 }
126}