Skip to main content

chaincraft_rust/
shared_object.rs

1//! Enhanced shared object implementation with application-specific logic
2
3pub use crate::shared::SharedObjectId;
4use crate::{
5    error::{ChaincraftError, Result},
6    shared::{MessageType, SharedMessage, SharedObject},
7};
8use async_trait::async_trait;
9use chrono;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use sha2::{Digest, Sha256};
13use std::any::Any;
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18/// Enhanced shared object trait with application-specific functionality
19#[async_trait]
20pub trait ApplicationObject: Send + Sync + std::fmt::Debug {
21    /// Get the object's unique identifier
22    fn id(&self) -> &SharedObjectId;
23
24    /// Get the object's type name
25    fn type_name(&self) -> &'static str;
26
27    /// Validate if a message is valid for this object
28    async fn is_valid(&self, message: &SharedMessage) -> Result<bool>;
29
30    /// Add a validated message to the object
31    async fn add_message(&mut self, message: SharedMessage) -> Result<()>;
32
33    /// Check if this object supports merkleized synchronization
34    fn is_merkleized(&self) -> bool;
35
36    /// Get the latest state digest
37    async fn get_latest_digest(&self) -> Result<String>;
38
39    /// Check if object has a specific digest
40    async fn has_digest(&self, digest: &str) -> Result<bool>;
41
42    /// Validate if a digest is valid
43    async fn is_valid_digest(&self, digest: &str) -> Result<bool>;
44
45    /// Add a digest to the object
46    async fn add_digest(&mut self, digest: String) -> Result<bool>;
47
48    /// Get messages for gossip protocol
49    async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>>;
50
51    /// Get messages since a specific digest
52    async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>>;
53
54    /// Get the current state as JSON
55    async fn get_state(&self) -> Result<Value>;
56
57    /// Reset the object to initial state
58    async fn reset(&mut self) -> Result<()>;
59
60    /// Clone the object
61    fn clone_box(&self) -> Box<dyn ApplicationObject>;
62
63    /// Get reference as Any for downcasting
64    fn as_any(&self) -> &dyn Any;
65
66    /// Get mutable reference as Any for downcasting
67    fn as_any_mut(&mut self) -> &mut dyn Any;
68}
69
70/// Simple shared number object for testing (equivalent to Python SimpleSharedNumber)
71#[derive(Debug, Clone)]
72pub struct SimpleSharedNumber {
73    id: SharedObjectId,
74    number: i64,
75    created_at: chrono::DateTime<chrono::Utc>,
76    updated_at: chrono::DateTime<chrono::Utc>,
77    locked: bool,
78    messages: Vec<SharedMessage>,
79    seen_hashes: HashSet<String>,
80    digests: Vec<String>,
81}
82
83impl SimpleSharedNumber {
84    pub fn new() -> Self {
85        Self {
86            id: SharedObjectId::new(),
87            number: 0,
88            created_at: chrono::Utc::now(),
89            updated_at: chrono::Utc::now(),
90            locked: false,
91            messages: Vec::new(),
92            seen_hashes: HashSet::new(),
93            digests: Vec::new(),
94        }
95    }
96
97    pub fn get_number(&self) -> i64 {
98        self.number
99    }
100
101    pub fn get_messages(&self) -> &[SharedMessage] {
102        &self.messages
103    }
104
105    fn calculate_message_hash(data: &Value) -> String {
106        let data_str = serde_json::to_string(&serde_json::json!({
107            "content": data
108        }))
109        .unwrap_or_default();
110        let mut hasher = Sha256::new();
111        hasher.update(data_str.as_bytes());
112        hex::encode(hasher.finalize())
113    }
114}
115
116impl Default for SimpleSharedNumber {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122#[async_trait]
123impl ApplicationObject for SimpleSharedNumber {
124    fn id(&self) -> &SharedObjectId {
125        &self.id
126    }
127
128    fn type_name(&self) -> &'static str {
129        "SimpleSharedNumber"
130    }
131
132    async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
133        // We only accept integer data
134        Ok(message.data.is_i64())
135    }
136
137    async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
138        // Deduplicate by hashing the message's data field
139        let msg_hash = Self::calculate_message_hash(&message.data);
140
141        if self.seen_hashes.contains(&msg_hash) {
142            // Already processed this data
143            return Ok(());
144        }
145
146        self.seen_hashes.insert(msg_hash);
147
148        // Extract the integer value and add to our number
149        if let Some(value) = message.data.as_i64() {
150            self.number += value;
151            self.messages.push(message);
152            tracing::info!("SimpleSharedNumber: Added message with data: {}", value);
153        }
154
155        Ok(())
156    }
157
158    fn is_merkleized(&self) -> bool {
159        false
160    }
161
162    async fn get_latest_digest(&self) -> Result<String> {
163        Ok(self.number.to_string())
164    }
165
166    async fn has_digest(&self, digest: &str) -> Result<bool> {
167        Ok(self.digests.contains(&digest.to_string()))
168    }
169
170    async fn is_valid_digest(&self, _digest: &str) -> Result<bool> {
171        Ok(true)
172    }
173
174    async fn add_digest(&mut self, digest: String) -> Result<bool> {
175        self.digests.push(digest);
176        Ok(true)
177    }
178
179    async fn gossip_messages(&self, _digest: Option<&str>) -> Result<Vec<SharedMessage>> {
180        Ok(Vec::new())
181    }
182
183    async fn get_messages_since_digest(&self, _digest: &str) -> Result<Vec<SharedMessage>> {
184        Ok(Vec::new())
185    }
186
187    async fn get_state(&self) -> Result<Value> {
188        Ok(serde_json::json!({
189            "number": self.number,
190            "message_count": self.messages.len(),
191            "seen_hashes_count": self.seen_hashes.len()
192        }))
193    }
194
195    async fn reset(&mut self) -> Result<()> {
196        self.number = 0;
197        self.messages.clear();
198        self.seen_hashes.clear();
199        self.digests.clear();
200        Ok(())
201    }
202
203    fn clone_box(&self) -> Box<dyn ApplicationObject> {
204        Box::new(self.clone())
205    }
206
207    fn as_any(&self) -> &dyn Any {
208        self
209    }
210
211    fn as_any_mut(&mut self) -> &mut dyn Any {
212        self
213    }
214}
215
216/// Merkelized chain object (Rust analogue of Python SimpleChainObject)
217///
218/// Maintains an append-only chain of SHA256 hashes where each hash is derived from
219/// the previous hash. Supports digest-based synchronization for efficient
220/// state sync between nodes.
221#[derive(Debug, Clone)]
222pub struct MerkelizedChain {
223    id: SharedObjectId,
224    /// The chain of hashes (genesis is at index 0)
225    chain: Vec<String>,
226    /// Messages corresponding to each chain entry (optional payload)
227    messages: Vec<SharedMessage>,
228    /// Set of all hashes in the chain for O(1) lookup
229    hash_set: HashSet<String>,
230    created_at: chrono::DateTime<chrono::Utc>,
231}
232
233impl MerkelizedChain {
234    /// Create a new MerkelizedChain with a genesis hash
235    pub fn new() -> Self {
236        let genesis = Self::calculate_hash("genesis");
237        Self {
238            id: SharedObjectId::new(),
239            chain: vec![genesis.clone()],
240            messages: vec![SharedMessage::new(
241                MessageType::Custom("genesis".to_string()),
242                serde_json::json!("genesis"),
243            )],
244            hash_set: {
245                let mut set = HashSet::new();
246                set.insert(genesis);
247                set
248            },
249            created_at: chrono::Utc::now(),
250        }
251    }
252
253    /// Calculate the SHA256 hash of data (consistent with Python)
254    pub fn calculate_hash(data: &str) -> String {
255        let mut hasher = Sha256::new();
256        hasher.update(data.as_bytes());
257        hex::encode(hasher.finalize())
258    }
259
260    /// Calculate what the next hash should be given a previous hash
261    pub fn calculate_next_hash(prev_hash: &str) -> String {
262        Self::calculate_hash(prev_hash)
263    }
264
265    /// Get the current chain length (including genesis)
266    pub fn chain_length(&self) -> usize {
267        self.chain.len()
268    }
269
270    /// Get the genesis hash
271    pub fn genesis_hash(&self) -> &str {
272        &self.chain[0]
273    }
274
275    /// Get the latest (tip) hash
276    pub fn latest_hash(&self) -> &str {
277        self.chain.last().expect("chain is never empty")
278    }
279
280    /// Get a specific hash by index
281    pub fn hash_at(&self, index: usize) -> Option<&str> {
282        self.chain.get(index).map(|s| s.as_str())
283    }
284
285    /// Check if a hash is valid as the next hash in the chain
286    pub fn is_valid_next_hash(&self, hash: &str) -> bool {
287        let expected = Self::calculate_next_hash(self.latest_hash());
288        hash == expected
289    }
290
291    /// Add a next hash to the chain (returns the new hash)
292    pub fn add_next_hash(&mut self) -> String {
293        let next_hash = Self::calculate_next_hash(self.latest_hash());
294        self.chain.push(next_hash.clone());
295        self.hash_set.insert(next_hash.clone());
296        
297        // Create a message for this hash
298        let msg = SharedMessage::new(
299            MessageType::Custom("chain_update".to_string()),
300            serde_json::json!(next_hash),
301        );
302        self.messages.push(msg);
303        
304        next_hash
305    }
306
307    /// Try to add an existing hash to the chain (for sync from peers)
308    /// Returns true if the hash was added, false if invalid or duplicate
309    pub fn try_add_hash(&mut self, hash: &str) -> bool {
310        // Skip if already in chain
311        if self.hash_set.contains(hash) {
312            return false;
313        }
314
315        // Check if this hash follows from any hash in our chain
316        for i in 0..self.chain.len() {
317            let expected_next = Self::calculate_next_hash(&self.chain[i]);
318            if hash == expected_next {
319                self.chain.push(hash.to_string());
320                self.hash_set.insert(hash.to_string());
321                
322                let msg = SharedMessage::new(
323                    MessageType::Custom("chain_update".to_string()),
324                    serde_json::json!(hash),
325                );
326                self.messages.push(msg);
327                
328                return true;
329            }
330        }
331
332        false
333    }
334
335    /// Get the chain as a slice of hashes
336    pub fn chain(&self) -> &[String] {
337        &self.chain
338    }
339
340    /// Find the index of a hash in the chain
341    pub fn find_hash_index(&self, hash: &str) -> Option<usize> {
342        self.chain.iter().position(|h| h == hash)
343    }
344}
345
346impl Default for MerkelizedChain {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352#[async_trait]
353impl ApplicationObject for MerkelizedChain {
354    fn id(&self) -> &SharedObjectId {
355        &self.id
356    }
357
358    fn type_name(&self) -> &'static str {
359        "MerkelizedChain"
360    }
361
362    async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
363        // Accept string messages that are valid hashes
364        let Some(hash) = message.data.as_str() else {
365            return Ok(false);
366        };
367
368        // If already in chain, it's valid (for deduplication)
369        if self.hash_set.contains(hash) {
370            return Ok(true);
371        }
372
373        // Check if it's a valid next hash from any existing hash
374        for i in 0..self.chain.len() {
375            let expected_next = Self::calculate_next_hash(&self.chain[i]);
376            if hash == expected_next {
377                return Ok(true);
378            }
379        }
380
381        Ok(false)
382    }
383
384    async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
385        let Some(hash) = message.data.as_str() else {
386            return Ok(());
387        };
388
389        // Skip if already in chain
390        if self.hash_set.contains(hash) {
391            return Ok(());
392        }
393
394        // Try to add to chain
395        if self.try_add_hash(hash) {
396            tracing::info!("MerkelizedChain: Added hash {} to chain (length: {})", 
397                &hash[..8.min(hash.len())], self.chain.len());
398        }
399
400        Ok(())
401    }
402
403    fn is_merkleized(&self) -> bool {
404        true
405    }
406
407    async fn get_latest_digest(&self) -> Result<String> {
408        Ok(self.latest_hash().to_string())
409    }
410
411    async fn has_digest(&self, digest: &str) -> Result<bool> {
412        Ok(self.hash_set.contains(digest))
413    }
414
415    async fn is_valid_digest(&self, digest: &str) -> Result<bool> {
416        Ok(self.hash_set.contains(digest) || self.is_valid_next_hash(digest))
417    }
418
419    async fn add_digest(&mut self, digest: String) -> Result<bool> {
420        if self.try_add_hash(&digest) {
421            Ok(true)
422        } else {
423            Ok(false)
424        }
425    }
426
427    async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>> {
428        let start_index = match digest {
429            Some(hash) => {
430                match self.find_hash_index(hash) {
431                    Some(idx) => idx + 1, // Start after the given digest
432                    None => return Ok(Vec::new()), // Unknown digest
433                }
434            }
435            None => 1, // Skip genesis, return all subsequent
436        };
437
438        let mut result = Vec::new();
439        for i in start_index..self.chain.len() {
440            let msg = SharedMessage::new(
441                MessageType::Custom("chain_update".to_string()),
442                serde_json::json!(self.chain[i]),
443            );
444            result.push(msg);
445        }
446
447        Ok(result)
448    }
449
450    async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>> {
451        self.gossip_messages(Some(digest)).await
452    }
453
454    async fn get_state(&self) -> Result<Value> {
455        Ok(serde_json::json!({
456            "chain_length": self.chain.len(),
457            "latest_hash": self.latest_hash(),
458            "genesis_hash": self.genesis_hash(),
459        }))
460    }
461
462    async fn reset(&mut self) -> Result<()> {
463        let genesis = Self::calculate_hash("genesis");
464        self.chain = vec![genesis.clone()];
465        self.hash_set = {
466            let mut set = HashSet::new();
467            set.insert(genesis);
468            set
469        };
470        self.messages = vec![SharedMessage::new(
471            MessageType::Custom("genesis".to_string()),
472            serde_json::json!("genesis"),
473        )];
474        Ok(())
475    }
476
477    fn clone_box(&self) -> Box<dyn ApplicationObject> {
478        Box::new(self.clone())
479    }
480
481    fn as_any(&self) -> &dyn Any {
482        self
483    }
484
485    fn as_any_mut(&mut self) -> &mut dyn Any {
486        self
487    }
488}
489
490/// Message chain - append-only ordered log of messages.
491/// Mirrors Python message chain concept: ordered sequence of messages
492/// with digest-based sync support.
493#[derive(Debug, Clone)]
494pub struct MessageChain {
495    id: SharedObjectId,
496    messages: Vec<SharedMessage>,
497    seen_hashes: HashSet<String>,
498    digests: Vec<String>,
499}
500
501impl MessageChain {
502    pub fn new() -> Self {
503        Self {
504            id: SharedObjectId::new(),
505            messages: Vec::new(),
506            seen_hashes: HashSet::new(),
507            digests: Vec::new(),
508        }
509    }
510
511    pub fn len(&self) -> usize {
512        self.messages.len()
513    }
514
515    pub fn messages(&self) -> &[SharedMessage] {
516        &self.messages
517    }
518
519    fn msg_hash(msg: &SharedMessage) -> String {
520        msg.hash.clone()
521    }
522}
523
524impl Default for MessageChain {
525    fn default() -> Self {
526        Self::new()
527    }
528}
529
530#[async_trait]
531impl ApplicationObject for MessageChain {
532    fn id(&self) -> &SharedObjectId {
533        &self.id
534    }
535
536    fn type_name(&self) -> &'static str {
537        "MessageChain"
538    }
539
540    async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
541        Ok(!message.data.is_null())
542    }
543
544    async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
545        let h = Self::msg_hash(&message);
546        if self.seen_hashes.contains(&h) {
547            return Ok(());
548        }
549        self.seen_hashes.insert(h);
550        self.messages.push(message);
551        Ok(())
552    }
553
554    fn is_merkleized(&self) -> bool {
555        true
556    }
557
558    async fn get_latest_digest(&self) -> Result<String> {
559        Ok(self
560            .messages
561            .last()
562            .map(|m| m.hash.clone())
563            .unwrap_or_else(|| "genesis".to_string()))
564    }
565
566    async fn has_digest(&self, digest: &str) -> Result<bool> {
567        Ok(self.digests.contains(&digest.to_string())
568            || self.messages.iter().any(|m| m.hash == digest))
569    }
570
571    async fn is_valid_digest(&self, digest: &str) -> Result<bool> {
572        Ok(self.has_digest(digest).await?
573            || digest == "genesis"
574            || self.seen_hashes.contains(digest))
575    }
576
577    async fn add_digest(&mut self, _digest: String) -> Result<bool> {
578        Ok(false)
579    }
580
581    async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>> {
582        let start = match digest {
583            Some(d) if d != "genesis" => {
584                self.messages
585                    .iter()
586                    .position(|m| m.hash == d)
587                    .map(|i| i + 1)
588                    .unwrap_or(0)
589            }
590            _ => 0,
591        };
592        Ok(self.messages[start..].to_vec())
593    }
594
595    async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>> {
596        self.gossip_messages(Some(digest)).await
597    }
598
599    async fn get_state(&self) -> Result<Value> {
600        Ok(serde_json::json!({
601            "length": self.messages.len(),
602            "message_count": self.messages.len()
603        }))
604    }
605
606    async fn reset(&mut self) -> Result<()> {
607        self.messages.clear();
608        self.seen_hashes.clear();
609        self.digests.clear();
610        Ok(())
611    }
612
613    fn clone_box(&self) -> Box<dyn ApplicationObject> {
614        Box::new(self.clone())
615    }
616
617    fn as_any(&self) -> &dyn Any {
618        self
619    }
620
621    fn as_any_mut(&mut self) -> &mut dyn Any {
622        self
623    }
624}
625
626/// Registry for managing application objects
627#[derive(Debug)]
628pub struct ApplicationObjectRegistry {
629    objects: HashMap<SharedObjectId, Box<dyn ApplicationObject>>,
630    objects_by_type: HashMap<String, Vec<SharedObjectId>>,
631}
632
633impl ApplicationObjectRegistry {
634    pub fn new() -> Self {
635        Self {
636            objects: HashMap::new(),
637            objects_by_type: HashMap::new(),
638        }
639    }
640
641    /// Register a new application object
642    pub fn register(&mut self, object: Box<dyn ApplicationObject>) -> SharedObjectId {
643        let id = object.id().clone();
644        let type_name = object.type_name().to_string();
645
646        self.objects_by_type
647            .entry(type_name)
648            .or_default()
649            .push(id.clone());
650
651        self.objects.insert(id.clone(), object);
652        id
653    }
654
655    /// Get an object by ID
656    pub fn get(&self, id: &SharedObjectId) -> Option<&dyn ApplicationObject> {
657        self.objects.get(id).map(|obj| obj.as_ref())
658    }
659
660    /// Get all objects of a specific type (returning owned clones for safety)
661    pub fn get_by_type(&self, type_name: &str) -> Vec<Box<dyn ApplicationObject>> {
662        self.objects_by_type
663            .get(type_name)
664            .map(|ids| {
665                ids.iter()
666                    .filter_map(|id| self.objects.get(id))
667                    .map(|obj| obj.clone_box())
668                    .collect()
669            })
670            .unwrap_or_default()
671    }
672
673    /// Remove an object
674    pub fn remove(&mut self, id: &SharedObjectId) -> Option<Box<dyn ApplicationObject>> {
675        if let Some(object) = self.objects.remove(id) {
676            let type_name = object.type_name().to_string();
677            if let Some(type_list) = self.objects_by_type.get_mut(&type_name) {
678                type_list.retain(|obj_id| obj_id != id);
679                if type_list.is_empty() {
680                    self.objects_by_type.remove(&type_name);
681                }
682            }
683            Some(object)
684        } else {
685            None
686        }
687    }
688
689    /// Get all object IDs
690    pub fn ids(&self) -> Vec<SharedObjectId> {
691        self.objects.keys().cloned().collect()
692    }
693
694    /// Get count of objects
695    pub fn len(&self) -> usize {
696        self.objects.len()
697    }
698
699    /// Check if registry is empty
700    pub fn is_empty(&self) -> bool {
701        self.objects.is_empty()
702    }
703
704    /// Clear all objects
705    pub fn clear(&mut self) {
706        self.objects.clear();
707        self.objects_by_type.clear();
708    }
709
710    /// Process a message against all appropriate objects
711    pub async fn process_message(&mut self, message: SharedMessage) -> Result<Vec<SharedObjectId>> {
712        let mut processed_objects = Vec::new();
713
714        // Get all object IDs first to avoid borrow checker issues
715        let ids: Vec<SharedObjectId> = self.objects.keys().cloned().collect();
716
717        // Process each object sequentially
718        for id in ids {
719            // Check validity first
720            let is_valid = if let Some(object) = self.objects.get(&id) {
721                object.is_valid(&message).await?
722            } else {
723                false
724            };
725
726            // If valid, add the message
727            if is_valid {
728                if let Some(object) = self.objects.get_mut(&id) {
729                    object.add_message(message.clone()).await?;
730                    processed_objects.push(id);
731                }
732            }
733        }
734
735        Ok(processed_objects)
736    }
737}
738
739impl Default for ApplicationObjectRegistry {
740    fn default() -> Self {
741        Self::new()
742    }
743}