Skip to main content

chaincraft_rust/
shared.rs

1//! Shared objects and messages for distributed state management
2
3use crate::error::{ChaincraftError, CryptoError, Result, SerializationError};
4use async_trait::async_trait;
5use bincode;
6use hex;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use std::any::Any;
10use std::collections::HashMap;
11use std::fmt::{self, Debug};
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use uuid::Uuid;
15
16/// Unique identifier for shared objects
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct SharedObjectId(Uuid);
19
20impl SharedObjectId {
21    pub fn new() -> Self {
22        Self(Uuid::new_v4())
23    }
24
25    pub fn as_uuid(&self) -> &Uuid {
26        &self.0
27    }
28
29    pub fn into_uuid(self) -> Uuid {
30        self.0
31    }
32
33    pub fn from_uuid(uuid: Uuid) -> Self {
34        Self(uuid)
35    }
36}
37
38impl fmt::Display for SharedObjectId {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        write!(f, "{}", self.0)
41    }
42}
43
44impl Default for SharedObjectId {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50/// Message types for inter-node communication
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum MessageType {
53    /// Peer discovery message
54    PeerDiscovery,
55    /// Request for local peers
56    RequestLocalPeers,
57    /// Response with local peers
58    LocalPeers,
59    /// Request for shared object update
60    RequestSharedObjectUpdate,
61    /// Response with shared object data
62    SharedObjectUpdate,
63    /// Request to get an object
64    Get,
65    /// Request to set an object
66    Set,
67    /// Request to delete an object
68    Delete,
69    /// Response containing requested data
70    Response,
71    /// Notification of changes
72    Notification,
73    /// Heartbeat/ping message
74    Heartbeat,
75    /// Error response
76    Error,
77    /// Request latest digest for digest-based sync
78    RequestDigest,
79    /// Request messages since a digest
80    RequestMessagesSince,
81    /// Response with latest digest
82    DigestResponse,
83    /// Response with messages since digest
84    MessagesResponse,
85    /// Custom application message
86    Custom(String),
87}
88
89impl Serialize for MessageType {
90    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
91    where
92        S: serde::Serializer,
93    {
94        match self {
95            MessageType::Custom(name) => {
96                #[derive(Serialize)]
97                struct Custom {
98                    #[serde(rename = "Custom")]
99                    custom: String,
100                }
101                Custom {
102                    custom: name.clone(),
103                }
104                .serialize(serializer)
105            },
106            _ => serializer.serialize_str(&self.to_string()),
107        }
108    }
109}
110
111impl<'de> Deserialize<'de> for MessageType {
112    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
113    where
114        D: serde::Deserializer<'de>,
115    {
116        use serde::de::{Error, Visitor};
117        use std::fmt;
118
119        struct MessageTypeVisitor;
120
121        impl<'de> Visitor<'de> for MessageTypeVisitor {
122            type Value = MessageType;
123
124            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
125                formatter.write_str("a message type string or Custom object")
126            }
127
128            fn visit_str<E>(self, value: &str) -> std::result::Result<MessageType, E>
129            where
130                E: Error,
131            {
132                match value {
133                    "PEER_DISCOVERY" => Ok(MessageType::PeerDiscovery),
134                    "REQUEST_LOCAL_PEERS" => Ok(MessageType::RequestLocalPeers),
135                    "LOCAL_PEERS" => Ok(MessageType::LocalPeers),
136                    "REQUEST_SHARED_OBJECT_UPDATE" => Ok(MessageType::RequestSharedObjectUpdate),
137                    "SHARED_OBJECT_UPDATE" => Ok(MessageType::SharedObjectUpdate),
138                    "GET" => Ok(MessageType::Get),
139                    "SET" => Ok(MessageType::Set),
140                    "DELETE" => Ok(MessageType::Delete),
141                    "RESPONSE" => Ok(MessageType::Response),
142                    "NOTIFICATION" => Ok(MessageType::Notification),
143                    "HEARTBEAT" => Ok(MessageType::Heartbeat),
144                    "ERROR" => Ok(MessageType::Error),
145                    "REQUEST_DIGEST" => Ok(MessageType::RequestDigest),
146                    "REQUEST_MESSAGES_SINCE" => Ok(MessageType::RequestMessagesSince),
147                    "DIGEST_RESPONSE" => Ok(MessageType::DigestResponse),
148                    "MESSAGES_RESPONSE" => Ok(MessageType::MessagesResponse),
149                    _ => Ok(MessageType::Custom(value.to_string())),
150                }
151            }
152
153            fn visit_map<A>(self, mut map: A) -> std::result::Result<MessageType, A::Error>
154            where
155                A: serde::de::MapAccess<'de>,
156            {
157                if let Some(key) = map.next_key::<String>()? {
158                    if key == "Custom" {
159                        let value: String = map.next_value()?;
160                        Ok(MessageType::Custom(value))
161                    } else {
162                        Err(Error::unknown_field(&key, &["Custom"]))
163                    }
164                } else {
165                    Err(Error::missing_field("Custom"))
166                }
167            }
168        }
169
170        deserializer.deserialize_any(MessageTypeVisitor)
171    }
172}
173
174impl fmt::Display for MessageType {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        match self {
177            MessageType::PeerDiscovery => write!(f, "PEER_DISCOVERY"),
178            MessageType::RequestLocalPeers => write!(f, "REQUEST_LOCAL_PEERS"),
179            MessageType::LocalPeers => write!(f, "LOCAL_PEERS"),
180            MessageType::RequestSharedObjectUpdate => write!(f, "REQUEST_SHARED_OBJECT_UPDATE"),
181            MessageType::SharedObjectUpdate => write!(f, "SHARED_OBJECT_UPDATE"),
182            MessageType::Get => write!(f, "GET"),
183            MessageType::Set => write!(f, "SET"),
184            MessageType::Delete => write!(f, "DELETE"),
185            MessageType::Response => write!(f, "RESPONSE"),
186            MessageType::Notification => write!(f, "NOTIFICATION"),
187            MessageType::Heartbeat => write!(f, "HEARTBEAT"),
188            MessageType::Error => write!(f, "ERROR"),
189            MessageType::RequestDigest => write!(f, "REQUEST_DIGEST"),
190            MessageType::RequestMessagesSince => write!(f, "REQUEST_MESSAGES_SINCE"),
191            MessageType::DigestResponse => write!(f, "DIGEST_RESPONSE"),
192            MessageType::MessagesResponse => write!(f, "MESSAGES_RESPONSE"),
193            MessageType::Custom(name) => write!(f, "{}", name),
194        }
195    }
196}
197
198/// A message that can be shared between nodes
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct SharedMessage {
201    /// Unique message identifier
202    pub id: SharedObjectId,
203    /// Message type
204    pub message_type: MessageType,
205    /// Target object ID (if applicable)
206    pub target_id: Option<SharedObjectId>,
207    /// Message payload
208    pub data: serde_json::Value,
209    /// Timestamp when message was created
210    pub timestamp: chrono::DateTime<chrono::Utc>,
211    /// Optional signature for authenticated messages
212    #[serde(with = "serde_bytes", default)]
213    pub signature: Option<Vec<u8>>,
214    /// Hash of the message content
215    pub hash: String,
216}
217
218impl SharedMessage {
219    /// Create a new shared message
220    pub fn new(message_type: MessageType, data: serde_json::Value) -> Self {
221        let mut message = Self {
222            id: SharedObjectId::new(),
223            message_type,
224            target_id: None,
225            data,
226            timestamp: chrono::Utc::now(),
227            signature: None,
228            hash: String::new(),
229        };
230        message.hash = message.calculate_hash();
231        message
232    }
233
234    /// Create a new message with a target
235    pub fn new_with_target(
236        message_type: MessageType,
237        target_id: SharedObjectId,
238        data: serde_json::Value,
239    ) -> Self {
240        let mut message = Self {
241            id: SharedObjectId::new(),
242            message_type,
243            target_id: Some(target_id),
244            data,
245            timestamp: chrono::Utc::now(),
246            signature: None,
247            hash: String::new(),
248        };
249        message.hash = message.calculate_hash();
250        message
251    }
252
253    /// Create a custom message
254    pub fn custom<T: Serialize>(message_type: impl Into<String>, data: T) -> Result<Self> {
255        let data = serde_json::to_value(data)
256            .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))?;
257        Ok(Self::new(MessageType::Custom(message_type.into()), data))
258    }
259
260    /// Sign this message with the given private key
261    pub fn sign(&mut self, private_key: &crate::crypto::PrivateKey) -> Result<()> {
262        let message_bytes = self.to_bytes()?;
263        let signature = private_key.sign(&message_bytes)?;
264        self.signature = Some(signature.to_bytes());
265        Ok(())
266    }
267
268    /// Verify the signature of this message
269    pub fn verify_signature(&self, public_key: &crate::crypto::PublicKey) -> Result<bool> {
270        if let Some(sig_bytes) = &self.signature {
271            // Create a copy without signature for verification
272            let mut message_copy = self.clone();
273            message_copy.signature = None;
274            let message_bytes = message_copy.to_bytes()?;
275
276            // Different approach for each key type to avoid the signature creation issues
277            match public_key {
278                crate::crypto::PublicKey::Ed25519(pk) => {
279                    if sig_bytes.len() != 64 {
280                        return Err(ChaincraftError::Crypto(CryptoError::InvalidSignature));
281                    }
282
283                    // Create the signature bytes array safely
284                    let mut sig_array = [0u8; 64];
285                    sig_array.copy_from_slice(&sig_bytes[0..64]);
286
287                    // In ed25519_dalek 2.0, from_bytes returns a Signature directly
288                    let signature = ed25519_dalek::Signature::from_bytes(&sig_array);
289
290                    use ed25519_dalek::Verifier;
291                    Ok(pk.verify(&message_bytes, &signature).is_ok())
292                },
293                crate::crypto::PublicKey::Secp256k1(pk) => {
294                    let sig_result = k256::ecdsa::Signature::from_slice(sig_bytes.as_slice());
295                    if sig_result.is_err() {
296                        return Err(ChaincraftError::Crypto(CryptoError::InvalidSignature));
297                    }
298
299                    use k256::ecdsa::{signature::Verifier, VerifyingKey};
300                    let verifying_key = VerifyingKey::from(pk);
301                    Ok(verifying_key
302                        .verify(&message_bytes, &sig_result.unwrap())
303                        .is_ok())
304                },
305            }
306        } else {
307            Ok(false)
308        }
309    }
310
311    /// Calculate the hash of this message
312    pub fn calculate_hash(&self) -> String {
313        let mut hasher = Sha256::new();
314        if let Ok(id_bytes) = bincode::serialize(self.id.as_uuid()) {
315            hasher.update(&id_bytes);
316        }
317        hasher.update(self.message_type.to_string().as_bytes());
318        hasher.update(self.data.to_string().as_bytes());
319        hasher.update(self.timestamp.to_rfc3339().as_bytes());
320        hex::encode(hasher.finalize())
321    }
322
323    /// Verify the message hash
324    pub fn verify_hash(&self) -> bool {
325        self.hash == self.calculate_hash()
326    }
327
328    /// Get the message size in bytes
329    pub fn size(&self) -> usize {
330        bincode::serialized_size(self).unwrap_or(0) as usize
331    }
332
333    /// Convert message to bytes for signing/verification
334    pub fn to_bytes(&self) -> Result<Vec<u8>> {
335        bincode::serialize(self)
336            .map_err(|e| ChaincraftError::Serialization(SerializationError::Binary(e)))
337    }
338
339    /// Create message from bytes
340    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
341        bincode::deserialize(bytes)
342            .map_err(|e| ChaincraftError::Serialization(SerializationError::Binary(e)))
343    }
344
345    /// Serialize to JSON
346    pub fn to_json(&self) -> Result<String> {
347        serde_json::to_string(self)
348            .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))
349    }
350
351    /// Deserialize from JSON
352    pub fn from_json(json: &str) -> Result<Self> {
353        serde_json::from_str(json)
354            .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))
355    }
356}
357
358impl PartialEq for SharedMessage {
359    fn eq(&self, other: &Self) -> bool {
360        self.id == other.id && self.hash == other.hash
361    }
362}
363
364impl Eq for SharedMessage {}
365
366/// Trait for objects that can be shared and synchronized across nodes
367#[async_trait]
368pub trait SharedObject: Send + Sync + Debug {
369    /// Get the unique identifier for this shared object
370    fn id(&self) -> SharedObjectId;
371
372    /// Get the type name of this shared object
373    fn type_name(&self) -> &'static str;
374
375    /// Validate if a message is valid for this shared object
376    async fn is_valid(&self, message: &SharedMessage) -> Result<bool>;
377
378    /// Process a validated message and update the object state
379    async fn add_message(&mut self, message: SharedMessage) -> Result<()>;
380
381    /// Check if this object supports merkleized synchronization
382    fn is_merkleized(&self) -> bool;
383
384    /// Get the latest state digest for synchronization
385    async fn get_latest_digest(&self) -> Result<String>;
386
387    /// Check if the object has a specific digest
388    async fn has_digest(&self, digest: &str) -> Result<bool>;
389
390    /// Validate if a digest is valid
391    async fn is_valid_digest(&self, digest: &str) -> Result<bool>;
392
393    /// Add a digest to the object
394    async fn add_digest(&mut self, digest: String) -> Result<bool>;
395
396    /// Get messages for gossip protocol
397    async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>>;
398
399    /// Get messages since a specific digest
400    async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>>;
401
402    /// Get the current state as a serializable value
403    async fn get_state(&self) -> Result<serde_json::Value>;
404
405    /// Reset the object to a clean state
406    async fn reset(&mut self) -> Result<()>;
407
408    /// Serialize the object to JSON
409    fn to_json(&self) -> Result<serde_json::Value>;
410
411    /// Update the object from JSON data
412    async fn apply_json(&mut self, data: serde_json::Value) -> Result<()>;
413
414    /// Clone the object as a trait object
415    fn clone_box(&self) -> Box<dyn SharedObject>;
416
417    /// Get a reference to self as Any for downcasting
418    fn as_any(&self) -> &dyn Any;
419
420    /// Get a mutable reference to self as Any for downcasting
421    fn as_any_mut(&mut self) -> &mut dyn Any;
422
423    /// Called when the object is accessed
424    async fn on_access(&mut self) -> Result<()>;
425
426    /// Called when the object is modified
427    async fn on_modify(&mut self) -> Result<()>;
428
429    /// Called when the object is deleted
430    async fn on_delete(&mut self) -> Result<()>;
431
432    /// Validate the object's current state
433    async fn validate(&self) -> Result<bool>;
434
435    /// Get object metadata
436    fn metadata(&self) -> HashMap<String, String>;
437}
438
439impl Clone for Box<dyn SharedObject> {
440    fn clone(&self) -> Self {
441        self.clone_box()
442    }
443}
444
445/// Digest for tracking shared object state
446#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
447pub struct StateDigest {
448    /// The digest hash
449    pub hash: String,
450    /// Timestamp when digest was created
451    pub timestamp: chrono::DateTime<chrono::Utc>,
452    /// Number of messages included in this digest
453    pub message_count: u64,
454}
455
456impl StateDigest {
457    /// Create a new state digest
458    pub fn new(hash: String, message_count: u64) -> Self {
459        Self {
460            hash,
461            timestamp: chrono::Utc::now(),
462            message_count,
463        }
464    }
465
466    /// Calculate a digest from messages
467    pub fn from_messages(messages: &[SharedMessage]) -> Self {
468        let mut hasher = Sha256::new();
469        for message in messages {
470            hasher.update(message.hash.as_bytes());
471        }
472        let hash = hex::encode(hasher.finalize());
473        Self::new(hash, messages.len() as u64)
474    }
475}
476
477/// Registry for managing shared objects
478pub struct SharedObjectRegistry {
479    objects: HashMap<SharedObjectId, Box<dyn SharedObject>>,
480    #[allow(dead_code)]
481    lock: Arc<RwLock<()>>,
482}
483
484impl Default for SharedObjectRegistry {
485    fn default() -> Self {
486        Self::new()
487    }
488}
489
490impl SharedObjectRegistry {
491    /// Create a new empty registry
492    pub fn new() -> Self {
493        Self {
494            objects: HashMap::new(),
495            lock: Arc::new(RwLock::new(())),
496        }
497    }
498
499    /// Register a shared object
500    pub fn register(&mut self, object: Box<dyn SharedObject>) -> SharedObjectId {
501        let id = object.id();
502        self.objects.insert(id.clone(), object);
503        id
504    }
505
506    /// Get a reference to an object by ID
507    pub fn get(&self, id: &SharedObjectId) -> Option<&dyn SharedObject> {
508        self.objects.get(id).map(|obj| obj.as_ref())
509    }
510
511    /// Remove an object from the registry
512    pub fn remove(&mut self, id: &SharedObjectId) -> Option<Box<dyn SharedObject>> {
513        self.objects.remove(id)
514    }
515
516    /// Get all object IDs
517    pub fn ids(&self) -> Vec<SharedObjectId> {
518        self.objects.keys().cloned().collect()
519    }
520
521    /// Get the number of registered objects
522    pub fn len(&self) -> usize {
523        self.objects.len()
524    }
525
526    /// Check if the registry is empty
527    pub fn is_empty(&self) -> bool {
528        self.objects.is_empty()
529    }
530
531    /// Clear all objects from the registry
532    pub fn clear(&mut self) {
533        self.objects.clear();
534    }
535
536    /// Get objects by type
537    pub fn get_by_type(&self, type_name: &str) -> Vec<&dyn SharedObject> {
538        self.objects
539            .values()
540            .filter(|obj| obj.type_name() == type_name)
541            .map(|obj| obj.as_ref())
542            .collect()
543    }
544
545    /// Check if an object exists
546    pub fn contains(&self, id: &SharedObjectId) -> bool {
547        self.objects.contains_key(id)
548    }
549}
550
551impl Debug for SharedObjectRegistry {
552    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
553        f.debug_struct("SharedObjectRegistry")
554            .field("object_count", &self.objects.len())
555            .field("object_ids", &self.ids())
556            .finish()
557    }
558}