rust_logic_graph/distributed/
context.rs1use serde::{Serialize, Deserialize};
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use anyhow::{Result, Context as AnyhowContext};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct DistributedContext {
15 pub session_id: String,
17
18 pub data: HashMap<String, Value>,
20
21 pub metadata: ContextMetadata,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ContextMetadata {
28 pub created_at: u64,
30
31 pub updated_at: u64,
33
34 pub version: u64,
36
37 pub modified_by: Option<String>,
39
40 pub tags: Vec<String>,
42}
43
44impl DistributedContext {
45 pub fn new(session_id: impl Into<String>) -> Self {
56 let now = std::time::SystemTime::now()
57 .duration_since(std::time::UNIX_EPOCH)
58 .unwrap()
59 .as_millis() as u64;
60
61 Self {
62 session_id: session_id.into(),
63 data: HashMap::new(),
64 metadata: ContextMetadata {
65 created_at: now,
66 updated_at: now,
67 version: 1,
68 modified_by: None,
69 tags: Vec::new(),
70 },
71 }
72 }
73
74 pub fn set(&mut self, key: impl Into<String>, value: Value) {
76 self.data.insert(key.into(), value);
77 self.bump_version();
78 }
79
80 pub fn get(&self, key: &str) -> Option<&Value> {
82 self.data.get(key)
83 }
84
85 pub fn remove(&mut self, key: &str) -> Option<Value> {
87 let result = self.data.remove(key);
88 if result.is_some() {
89 self.bump_version();
90 }
91 result
92 }
93
94 pub fn serialize(&self) -> Result<Vec<u8>> {
98 rmp_serde::to_vec(self)
99 .context("Failed to serialize distributed context")
100 }
101
102 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
104 rmp_serde::from_slice(bytes)
105 .context("Failed to deserialize distributed context")
106 }
107
108 pub fn to_json(&self) -> Result<String> {
110 serde_json::to_string_pretty(self)
111 .context("Failed to serialize context to JSON")
112 }
113
114 pub fn from_json(json: &str) -> Result<Self> {
116 serde_json::from_str(json)
117 .context("Failed to deserialize context from JSON")
118 }
119
120 pub fn snapshot(&self) -> ContextSnapshot {
122 ContextSnapshot {
123 session_id: self.session_id.clone(),
124 data: self.data.clone(),
125 version: self.metadata.version,
126 timestamp: self.metadata.updated_at,
127 }
128 }
129
130 pub fn merge(&mut self, other: &DistributedContext) {
134 for (key, value) in &other.data {
135 self.data.insert(key.clone(), value.clone());
136 }
137 self.bump_version();
138 }
139
140 fn bump_version(&mut self) {
142 self.metadata.version += 1;
143 self.metadata.updated_at = std::time::SystemTime::now()
144 .duration_since(std::time::UNIX_EPOCH)
145 .unwrap()
146 .as_millis() as u64;
147 }
148
149 pub fn add_tag(&mut self, tag: impl Into<String>) {
151 let tag = tag.into();
152 if !self.metadata.tags.contains(&tag) {
153 self.metadata.tags.push(tag);
154 }
155 }
156
157 pub fn set_modified_by(&mut self, service: impl Into<String>) {
159 self.metadata.modified_by = Some(service.into());
160 self.bump_version();
161 }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct ContextSnapshot {
167 pub session_id: String,
168 pub data: HashMap<String, Value>,
169 pub version: u64,
170 pub timestamp: u64,
171}
172
173#[derive(Debug, Clone)]
175pub struct SharedContext {
176 inner: Arc<RwLock<DistributedContext>>,
177}
178
179impl SharedContext {
180 pub fn new(session_id: impl Into<String>) -> Self {
182 Self {
183 inner: Arc::new(RwLock::new(DistributedContext::new(session_id))),
184 }
185 }
186
187 pub async fn get(&self, key: &str) -> Option<Value> {
189 let ctx = self.inner.read().await;
190 ctx.get(key).cloned()
191 }
192
193 pub async fn set(&self, key: impl Into<String>, value: Value) {
195 let mut ctx = self.inner.write().await;
196 ctx.set(key, value);
197 }
198
199 pub async fn serialize(&self) -> Result<Vec<u8>> {
201 let ctx = self.inner.read().await;
202 ctx.serialize()
203 }
204
205 pub async fn version(&self) -> u64 {
207 let ctx = self.inner.read().await;
208 ctx.metadata.version
209 }
210
211 pub async fn snapshot(&self) -> ContextSnapshot {
213 let ctx = self.inner.read().await;
214 ctx.snapshot()
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use serde_json::json;
222
223 #[test]
224 fn test_context_creation() {
225 let ctx = DistributedContext::new("test-session");
226 assert_eq!(ctx.session_id, "test-session");
227 assert_eq!(ctx.metadata.version, 1);
228 }
229
230 #[test]
231 fn test_set_and_get() {
232 let mut ctx = DistributedContext::new("test");
233 ctx.set("key1", json!("value1"));
234
235 assert_eq!(ctx.get("key1"), Some(&json!("value1")));
236 assert_eq!(ctx.metadata.version, 2);
237 }
238
239 #[test]
240 fn test_serialization() {
241 let mut ctx = DistributedContext::new("test");
242 ctx.set("user_id", json!("user-123"));
243 ctx.set("count", json!(42));
244
245 let bytes = ctx.serialize().unwrap();
246 let deserialized = DistributedContext::deserialize(&bytes).unwrap();
247
248 assert_eq!(deserialized.session_id, "test");
249 assert_eq!(deserialized.get("user_id"), Some(&json!("user-123")));
250 assert_eq!(deserialized.get("count"), Some(&json!(42)));
251 }
252
253 #[test]
254 fn test_json_serialization() {
255 let mut ctx = DistributedContext::new("test");
256 ctx.set("name", json!("Alice"));
257
258 let json_str = ctx.to_json().unwrap();
259 let deserialized = DistributedContext::from_json(&json_str).unwrap();
260
261 assert_eq!(deserialized.session_id, "test");
262 assert_eq!(deserialized.get("name"), Some(&json!("Alice")));
263 }
264
265 #[test]
266 fn test_merge() {
267 let mut ctx1 = DistributedContext::new("test");
268 ctx1.set("key1", json!("value1"));
269
270 let mut ctx2 = DistributedContext::new("test");
271 ctx2.set("key2", json!("value2"));
272
273 ctx1.merge(&ctx2);
274
275 assert_eq!(ctx1.get("key1"), Some(&json!("value1")));
276 assert_eq!(ctx1.get("key2"), Some(&json!("value2")));
277 }
278
279 #[tokio::test]
280 async fn test_shared_context() {
281 let ctx = SharedContext::new("test");
282
283 ctx.set("key1", json!("value1")).await;
284 let value = ctx.get("key1").await;
285
286 assert_eq!(value, Some(json!("value1")));
287 assert_eq!(ctx.version().await, 2);
288 }
289}