rust_logic_graph/distributed/
context.rs

1//! Distributed Context with Serialization
2//!
3//! Provides context management with efficient serialization for remote execution.
4
5use serde::{Serialize, Deserialize};
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use anyhow::{Result, Context as AnyhowContext};
11
12/// A distributed context that can be serialized and shared across services
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct DistributedContext {
15    /// Unique session identifier
16    pub session_id: String,
17    
18    /// Context data
19    pub data: HashMap<String, Value>,
20    
21    /// Metadata for tracking
22    pub metadata: ContextMetadata,
23}
24
25/// Metadata for distributed context
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ContextMetadata {
28    /// Creation timestamp (Unix timestamp in milliseconds)
29    pub created_at: u64,
30    
31    /// Last updated timestamp
32    pub updated_at: u64,
33    
34    /// Version number for conflict resolution
35    pub version: u64,
36    
37    /// Service that last modified this context
38    pub modified_by: Option<String>,
39    
40    /// Tags for categorization
41    pub tags: Vec<String>,
42}
43
44impl DistributedContext {
45    /// Create a new distributed context
46    ///
47    /// # Example
48    ///
49    /// ```
50    /// use rust_logic_graph::distributed::DistributedContext;
51    ///
52    /// let context = DistributedContext::new("session-123");
53    /// assert_eq!(context.session_id, "session-123");
54    /// ```
55    pub fn new(session_id: impl Into<String>) -> Self {
56        let now = std::time::SystemTime::now()
57            .duration_since(std::time::UNIX_EPOCH)
58            .unwrap()
59            .as_millis() as u64;
60        
61        Self {
62            session_id: session_id.into(),
63            data: HashMap::new(),
64            metadata: ContextMetadata {
65                created_at: now,
66                updated_at: now,
67                version: 1,
68                modified_by: None,
69                tags: Vec::new(),
70            },
71        }
72    }
73    
74    /// Set a value in the context
75    pub fn set(&mut self, key: impl Into<String>, value: Value) {
76        self.data.insert(key.into(), value);
77        self.bump_version();
78    }
79    
80    /// Get a value from the context
81    pub fn get(&self, key: &str) -> Option<&Value> {
82        self.data.get(key)
83    }
84    
85    /// Remove a value from the context
86    pub fn remove(&mut self, key: &str) -> Option<Value> {
87        let result = self.data.remove(key);
88        if result.is_some() {
89            self.bump_version();
90        }
91        result
92    }
93    
94    /// Serialize context to bytes for transmission
95    ///
96    /// Uses MessagePack for efficient binary serialization
97    pub fn serialize(&self) -> Result<Vec<u8>> {
98        rmp_serde::to_vec(self)
99            .context("Failed to serialize distributed context")
100    }
101    
102    /// Deserialize context from bytes
103    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
104        rmp_serde::from_slice(bytes)
105            .context("Failed to deserialize distributed context")
106    }
107    
108    /// Serialize to JSON (for debugging/human-readable)
109    pub fn to_json(&self) -> Result<String> {
110        serde_json::to_string_pretty(self)
111            .context("Failed to serialize context to JSON")
112    }
113    
114    /// Deserialize from JSON
115    pub fn from_json(json: &str) -> Result<Self> {
116        serde_json::from_str(json)
117            .context("Failed to deserialize context from JSON")
118    }
119    
120    /// Create a snapshot of current context state
121    pub fn snapshot(&self) -> ContextSnapshot {
122        ContextSnapshot {
123            session_id: self.session_id.clone(),
124            data: self.data.clone(),
125            version: self.metadata.version,
126            timestamp: self.metadata.updated_at,
127        }
128    }
129    
130    /// Merge another context into this one
131    ///
132    /// Performs a simple merge where newer values win
133    pub fn merge(&mut self, other: &DistributedContext) {
134        for (key, value) in &other.data {
135            self.data.insert(key.clone(), value.clone());
136        }
137        self.bump_version();
138    }
139    
140    /// Increment version and update timestamp
141    fn bump_version(&mut self) {
142        self.metadata.version += 1;
143        self.metadata.updated_at = std::time::SystemTime::now()
144            .duration_since(std::time::UNIX_EPOCH)
145            .unwrap()
146            .as_millis() as u64;
147    }
148    
149    /// Add a tag to the context
150    pub fn add_tag(&mut self, tag: impl Into<String>) {
151        let tag = tag.into();
152        if !self.metadata.tags.contains(&tag) {
153            self.metadata.tags.push(tag);
154        }
155    }
156    
157    /// Set the service that modified this context
158    pub fn set_modified_by(&mut self, service: impl Into<String>) {
159        self.metadata.modified_by = Some(service.into());
160        self.bump_version();
161    }
162}
163
164/// A lightweight snapshot of context state
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct ContextSnapshot {
167    pub session_id: String,
168    pub data: HashMap<String, Value>,
169    pub version: u64,
170    pub timestamp: u64,
171}
172
173/// Thread-safe wrapper for distributed context
174#[derive(Debug, Clone)]
175pub struct SharedContext {
176    inner: Arc<RwLock<DistributedContext>>,
177}
178
179impl SharedContext {
180    /// Create a new shared context
181    pub fn new(session_id: impl Into<String>) -> Self {
182        Self {
183            inner: Arc::new(RwLock::new(DistributedContext::new(session_id))),
184        }
185    }
186    
187    /// Get a value from the context
188    pub async fn get(&self, key: &str) -> Option<Value> {
189        let ctx = self.inner.read().await;
190        ctx.get(key).cloned()
191    }
192    
193    /// Set a value in the context
194    pub async fn set(&self, key: impl Into<String>, value: Value) {
195        let mut ctx = self.inner.write().await;
196        ctx.set(key, value);
197    }
198    
199    /// Serialize the context
200    pub async fn serialize(&self) -> Result<Vec<u8>> {
201        let ctx = self.inner.read().await;
202        ctx.serialize()
203    }
204    
205    /// Get current version
206    pub async fn version(&self) -> u64 {
207        let ctx = self.inner.read().await;
208        ctx.metadata.version
209    }
210    
211    /// Create a snapshot
212    pub async fn snapshot(&self) -> ContextSnapshot {
213        let ctx = self.inner.read().await;
214        ctx.snapshot()
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use serde_json::json;
222    
223    #[test]
224    fn test_context_creation() {
225        let ctx = DistributedContext::new("test-session");
226        assert_eq!(ctx.session_id, "test-session");
227        assert_eq!(ctx.metadata.version, 1);
228    }
229    
230    #[test]
231    fn test_set_and_get() {
232        let mut ctx = DistributedContext::new("test");
233        ctx.set("key1", json!("value1"));
234        
235        assert_eq!(ctx.get("key1"), Some(&json!("value1")));
236        assert_eq!(ctx.metadata.version, 2);
237    }
238    
239    #[test]
240    fn test_serialization() {
241        let mut ctx = DistributedContext::new("test");
242        ctx.set("user_id", json!("user-123"));
243        ctx.set("count", json!(42));
244        
245        let bytes = ctx.serialize().unwrap();
246        let deserialized = DistributedContext::deserialize(&bytes).unwrap();
247        
248        assert_eq!(deserialized.session_id, "test");
249        assert_eq!(deserialized.get("user_id"), Some(&json!("user-123")));
250        assert_eq!(deserialized.get("count"), Some(&json!(42)));
251    }
252    
253    #[test]
254    fn test_json_serialization() {
255        let mut ctx = DistributedContext::new("test");
256        ctx.set("name", json!("Alice"));
257        
258        let json_str = ctx.to_json().unwrap();
259        let deserialized = DistributedContext::from_json(&json_str).unwrap();
260        
261        assert_eq!(deserialized.session_id, "test");
262        assert_eq!(deserialized.get("name"), Some(&json!("Alice")));
263    }
264    
265    #[test]
266    fn test_merge() {
267        let mut ctx1 = DistributedContext::new("test");
268        ctx1.set("key1", json!("value1"));
269        
270        let mut ctx2 = DistributedContext::new("test");
271        ctx2.set("key2", json!("value2"));
272        
273        ctx1.merge(&ctx2);
274        
275        assert_eq!(ctx1.get("key1"), Some(&json!("value1")));
276        assert_eq!(ctx1.get("key2"), Some(&json!("value2")));
277    }
278    
279    #[tokio::test]
280    async fn test_shared_context() {
281        let ctx = SharedContext::new("test");
282        
283        ctx.set("key1", json!("value1")).await;
284        let value = ctx.get("key1").await;
285        
286        assert_eq!(value, Some(json!("value1")));
287        assert_eq!(ctx.version().await, 2);
288    }
289}