chaincraft_rust/
shared_object.rs

1//! Enhanced shared object implementation with application-specific logic
2
3pub use crate::shared::SharedObjectId;
4use crate::{
5    error::{ChaincraftError, Result},
6    shared::{MessageType, SharedMessage, SharedObject},
7};
8use async_trait::async_trait;
9use chrono;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use sha2::{Digest, Sha256};
13use std::any::Any;
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18/// Enhanced shared object trait with application-specific functionality
19#[async_trait]
20pub trait ApplicationObject: Send + Sync + std::fmt::Debug {
21    /// Get the object's unique identifier
22    fn id(&self) -> &SharedObjectId;
23
24    /// Get the object's type name
25    fn type_name(&self) -> &'static str;
26
27    /// Validate if a message is valid for this object
28    async fn is_valid(&self, message: &SharedMessage) -> Result<bool>;
29
30    /// Add a validated message to the object
31    async fn add_message(&mut self, message: SharedMessage) -> Result<()>;
32
33    /// Check if this object supports merkleized synchronization
34    fn is_merkleized(&self) -> bool;
35
36    /// Get the latest state digest
37    async fn get_latest_digest(&self) -> Result<String>;
38
39    /// Check if object has a specific digest
40    async fn has_digest(&self, digest: &str) -> Result<bool>;
41
42    /// Validate if a digest is valid
43    async fn is_valid_digest(&self, digest: &str) -> Result<bool>;
44
45    /// Add a digest to the object
46    async fn add_digest(&mut self, digest: String) -> Result<bool>;
47
48    /// Get messages for gossip protocol
49    async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>>;
50
51    /// Get messages since a specific digest
52    async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>>;
53
54    /// Get the current state as JSON
55    async fn get_state(&self) -> Result<Value>;
56
57    /// Reset the object to initial state
58    async fn reset(&mut self) -> Result<()>;
59
60    /// Clone the object
61    fn clone_box(&self) -> Box<dyn ApplicationObject>;
62
63    /// Get reference as Any for downcasting
64    fn as_any(&self) -> &dyn Any;
65
66    /// Get mutable reference as Any for downcasting
67    fn as_any_mut(&mut self) -> &mut dyn Any;
68}
69
70/// Simple shared number object for testing (equivalent to Python SimpleSharedNumber)
71#[derive(Debug, Clone)]
72pub struct SimpleSharedNumber {
73    id: SharedObjectId,
74    number: i64,
75    created_at: chrono::DateTime<chrono::Utc>,
76    updated_at: chrono::DateTime<chrono::Utc>,
77    locked: bool,
78    messages: Vec<SharedMessage>,
79    seen_hashes: HashSet<String>,
80    digests: Vec<String>,
81}
82
83impl SimpleSharedNumber {
84    pub fn new() -> Self {
85        Self {
86            id: SharedObjectId::new(),
87            number: 0,
88            created_at: chrono::Utc::now(),
89            updated_at: chrono::Utc::now(),
90            locked: false,
91            messages: Vec::new(),
92            seen_hashes: HashSet::new(),
93            digests: Vec::new(),
94        }
95    }
96
97    pub fn get_number(&self) -> i64 {
98        self.number
99    }
100
101    pub fn get_messages(&self) -> &[SharedMessage] {
102        &self.messages
103    }
104
105    fn calculate_message_hash(data: &Value) -> String {
106        let data_str = serde_json::to_string(&serde_json::json!({
107            "content": data
108        }))
109        .unwrap_or_default();
110        let mut hasher = Sha256::new();
111        hasher.update(data_str.as_bytes());
112        hex::encode(hasher.finalize())
113    }
114}
115
116impl Default for SimpleSharedNumber {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122#[async_trait]
123impl ApplicationObject for SimpleSharedNumber {
124    fn id(&self) -> &SharedObjectId {
125        &self.id
126    }
127
128    fn type_name(&self) -> &'static str {
129        "SimpleSharedNumber"
130    }
131
132    async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
133        // We only accept integer data
134        Ok(message.data.is_i64())
135    }
136
137    async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
138        // Deduplicate by hashing the message's data field
139        let msg_hash = Self::calculate_message_hash(&message.data);
140
141        if self.seen_hashes.contains(&msg_hash) {
142            // Already processed this data
143            return Ok(());
144        }
145
146        self.seen_hashes.insert(msg_hash);
147
148        // Extract the integer value and add to our number
149        if let Some(value) = message.data.as_i64() {
150            self.number += value;
151            self.messages.push(message);
152            tracing::info!("SimpleSharedNumber: Added message with data: {}", value);
153        }
154
155        Ok(())
156    }
157
158    fn is_merkleized(&self) -> bool {
159        false
160    }
161
162    async fn get_latest_digest(&self) -> Result<String> {
163        Ok(self.number.to_string())
164    }
165
166    async fn has_digest(&self, digest: &str) -> Result<bool> {
167        Ok(self.digests.contains(&digest.to_string()))
168    }
169
170    async fn is_valid_digest(&self, _digest: &str) -> Result<bool> {
171        Ok(true)
172    }
173
174    async fn add_digest(&mut self, digest: String) -> Result<bool> {
175        self.digests.push(digest);
176        Ok(true)
177    }
178
179    async fn gossip_messages(&self, _digest: Option<&str>) -> Result<Vec<SharedMessage>> {
180        Ok(Vec::new())
181    }
182
183    async fn get_messages_since_digest(&self, _digest: &str) -> Result<Vec<SharedMessage>> {
184        Ok(Vec::new())
185    }
186
187    async fn get_state(&self) -> Result<Value> {
188        Ok(serde_json::json!({
189            "number": self.number,
190            "message_count": self.messages.len(),
191            "seen_hashes_count": self.seen_hashes.len()
192        }))
193    }
194
195    async fn reset(&mut self) -> Result<()> {
196        self.number = 0;
197        self.messages.clear();
198        self.seen_hashes.clear();
199        self.digests.clear();
200        Ok(())
201    }
202
203    fn clone_box(&self) -> Box<dyn ApplicationObject> {
204        Box::new(self.clone())
205    }
206
207    fn as_any(&self) -> &dyn Any {
208        self
209    }
210
211    fn as_any_mut(&mut self) -> &mut dyn Any {
212        self
213    }
214}
215
216/// Registry for managing application objects
217#[derive(Debug)]
218pub struct ApplicationObjectRegistry {
219    objects: HashMap<SharedObjectId, Box<dyn ApplicationObject>>,
220    objects_by_type: HashMap<String, Vec<SharedObjectId>>,
221}
222
223impl ApplicationObjectRegistry {
224    pub fn new() -> Self {
225        Self {
226            objects: HashMap::new(),
227            objects_by_type: HashMap::new(),
228        }
229    }
230
231    /// Register a new application object
232    pub fn register(&mut self, object: Box<dyn ApplicationObject>) -> SharedObjectId {
233        let id = object.id().clone();
234        let type_name = object.type_name().to_string();
235
236        self.objects_by_type
237            .entry(type_name)
238            .or_default()
239            .push(id.clone());
240
241        self.objects.insert(id.clone(), object);
242        id
243    }
244
245    /// Get an object by ID
246    pub fn get(&self, id: &SharedObjectId) -> Option<&dyn ApplicationObject> {
247        self.objects.get(id).map(|obj| obj.as_ref())
248    }
249
250    /// Get all objects of a specific type (returning owned clones for safety)
251    pub fn get_by_type(&self, type_name: &str) -> Vec<Box<dyn ApplicationObject>> {
252        self.objects_by_type
253            .get(type_name)
254            .map(|ids| {
255                ids.iter()
256                    .filter_map(|id| self.objects.get(id))
257                    .map(|obj| obj.clone_box())
258                    .collect()
259            })
260            .unwrap_or_default()
261    }
262
263    /// Remove an object
264    pub fn remove(&mut self, id: &SharedObjectId) -> Option<Box<dyn ApplicationObject>> {
265        if let Some(object) = self.objects.remove(id) {
266            let type_name = object.type_name().to_string();
267            if let Some(type_list) = self.objects_by_type.get_mut(&type_name) {
268                type_list.retain(|obj_id| obj_id != id);
269                if type_list.is_empty() {
270                    self.objects_by_type.remove(&type_name);
271                }
272            }
273            Some(object)
274        } else {
275            None
276        }
277    }
278
279    /// Get all object IDs
280    pub fn ids(&self) -> Vec<SharedObjectId> {
281        self.objects.keys().cloned().collect()
282    }
283
284    /// Get count of objects
285    pub fn len(&self) -> usize {
286        self.objects.len()
287    }
288
289    /// Check if registry is empty
290    pub fn is_empty(&self) -> bool {
291        self.objects.is_empty()
292    }
293
294    /// Clear all objects
295    pub fn clear(&mut self) {
296        self.objects.clear();
297        self.objects_by_type.clear();
298    }
299
300    /// Process a message against all appropriate objects
301    pub async fn process_message(&mut self, message: SharedMessage) -> Result<Vec<SharedObjectId>> {
302        let mut processed_objects = Vec::new();
303
304        // Get all object IDs first to avoid borrow checker issues
305        let ids: Vec<SharedObjectId> = self.objects.keys().cloned().collect();
306
307        // Process each object sequentially
308        for id in ids {
309            // Check validity first
310            let is_valid = if let Some(object) = self.objects.get(&id) {
311                object.is_valid(&message).await?
312            } else {
313                false
314            };
315
316            // If valid, add the message
317            if is_valid {
318                if let Some(object) = self.objects.get_mut(&id) {
319                    object.add_message(message.clone()).await?;
320                    processed_objects.push(id);
321                }
322            }
323        }
324
325        Ok(processed_objects)
326    }
327}
328
329impl Default for ApplicationObjectRegistry {
330    fn default() -> Self {
331        Self::new()
332    }
333}