chaincraft_rust/
shared.rs

1//! Shared objects and messages for distributed state management
2
3use crate::error::{ChaincraftError, CryptoError, Result, SerializationError};
4use async_trait::async_trait;
5use bincode;
6use hex;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use std::any::Any;
10use std::collections::HashMap;
11use std::fmt::{self, Debug};
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use uuid::Uuid;
15
16/// Unique identifier for shared objects
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct SharedObjectId(Uuid);
19
20impl SharedObjectId {
21    pub fn new() -> Self {
22        Self(Uuid::new_v4())
23    }
24
25    pub fn as_uuid(&self) -> &Uuid {
26        &self.0
27    }
28
29    pub fn into_uuid(self) -> Uuid {
30        self.0
31    }
32
33    pub fn from_uuid(uuid: Uuid) -> Self {
34        Self(uuid)
35    }
36}
37
38impl fmt::Display for SharedObjectId {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        write!(f, "{}", self.0)
41    }
42}
43
44impl Default for SharedObjectId {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50/// Message types for inter-node communication
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum MessageType {
53    /// Peer discovery message
54    PeerDiscovery,
55    /// Request for local peers
56    RequestLocalPeers,
57    /// Response with local peers
58    LocalPeers,
59    /// Request for shared object update
60    RequestSharedObjectUpdate,
61    /// Response with shared object data
62    SharedObjectUpdate,
63    /// Request to get an object
64    Get,
65    /// Request to set an object
66    Set,
67    /// Request to delete an object
68    Delete,
69    /// Response containing requested data
70    Response,
71    /// Notification of changes
72    Notification,
73    /// Heartbeat/ping message
74    Heartbeat,
75    /// Error response
76    Error,
77    /// Custom application message
78    Custom(String),
79}
80
81impl Serialize for MessageType {
82    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
83    where
84        S: serde::Serializer,
85    {
86        match self {
87            MessageType::Custom(name) => {
88                #[derive(Serialize)]
89                struct Custom {
90                    #[serde(rename = "Custom")]
91                    custom: String,
92                }
93                Custom {
94                    custom: name.clone(),
95                }
96                .serialize(serializer)
97            },
98            _ => serializer.serialize_str(&self.to_string()),
99        }
100    }
101}
102
103impl<'de> Deserialize<'de> for MessageType {
104    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
105    where
106        D: serde::Deserializer<'de>,
107    {
108        use serde::de::{Error, Visitor};
109        use std::fmt;
110
111        struct MessageTypeVisitor;
112
113        impl<'de> Visitor<'de> for MessageTypeVisitor {
114            type Value = MessageType;
115
116            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
117                formatter.write_str("a message type string or Custom object")
118            }
119
120            fn visit_str<E>(self, value: &str) -> std::result::Result<MessageType, E>
121            where
122                E: Error,
123            {
124                match value {
125                    "PEER_DISCOVERY" => Ok(MessageType::PeerDiscovery),
126                    "REQUEST_LOCAL_PEERS" => Ok(MessageType::RequestLocalPeers),
127                    "LOCAL_PEERS" => Ok(MessageType::LocalPeers),
128                    "REQUEST_SHARED_OBJECT_UPDATE" => Ok(MessageType::RequestSharedObjectUpdate),
129                    "SHARED_OBJECT_UPDATE" => Ok(MessageType::SharedObjectUpdate),
130                    "GET" => Ok(MessageType::Get),
131                    "SET" => Ok(MessageType::Set),
132                    "DELETE" => Ok(MessageType::Delete),
133                    "RESPONSE" => Ok(MessageType::Response),
134                    "NOTIFICATION" => Ok(MessageType::Notification),
135                    "HEARTBEAT" => Ok(MessageType::Heartbeat),
136                    "ERROR" => Ok(MessageType::Error),
137                    _ => Ok(MessageType::Custom(value.to_string())),
138                }
139            }
140
141            fn visit_map<A>(self, mut map: A) -> std::result::Result<MessageType, A::Error>
142            where
143                A: serde::de::MapAccess<'de>,
144            {
145                if let Some(key) = map.next_key::<String>()? {
146                    if key == "Custom" {
147                        let value: String = map.next_value()?;
148                        Ok(MessageType::Custom(value))
149                    } else {
150                        Err(Error::unknown_field(&key, &["Custom"]))
151                    }
152                } else {
153                    Err(Error::missing_field("Custom"))
154                }
155            }
156        }
157
158        deserializer.deserialize_any(MessageTypeVisitor)
159    }
160}
161
162impl fmt::Display for MessageType {
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        match self {
165            MessageType::PeerDiscovery => write!(f, "PEER_DISCOVERY"),
166            MessageType::RequestLocalPeers => write!(f, "REQUEST_LOCAL_PEERS"),
167            MessageType::LocalPeers => write!(f, "LOCAL_PEERS"),
168            MessageType::RequestSharedObjectUpdate => write!(f, "REQUEST_SHARED_OBJECT_UPDATE"),
169            MessageType::SharedObjectUpdate => write!(f, "SHARED_OBJECT_UPDATE"),
170            MessageType::Get => write!(f, "GET"),
171            MessageType::Set => write!(f, "SET"),
172            MessageType::Delete => write!(f, "DELETE"),
173            MessageType::Response => write!(f, "RESPONSE"),
174            MessageType::Notification => write!(f, "NOTIFICATION"),
175            MessageType::Heartbeat => write!(f, "HEARTBEAT"),
176            MessageType::Error => write!(f, "ERROR"),
177            MessageType::Custom(name) => write!(f, "{}", name),
178        }
179    }
180}
181
182/// A message that can be shared between nodes
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct SharedMessage {
185    /// Unique message identifier
186    pub id: SharedObjectId,
187    /// Message type
188    pub message_type: MessageType,
189    /// Target object ID (if applicable)
190    pub target_id: Option<SharedObjectId>,
191    /// Message payload
192    pub data: serde_json::Value,
193    /// Timestamp when message was created
194    pub timestamp: chrono::DateTime<chrono::Utc>,
195    /// Optional signature for authenticated messages
196    #[serde(with = "serde_bytes", default)]
197    pub signature: Option<Vec<u8>>,
198    /// Hash of the message content
199    pub hash: String,
200}
201
202impl SharedMessage {
203    /// Create a new shared message
204    pub fn new(message_type: MessageType, data: serde_json::Value) -> Self {
205        let mut message = Self {
206            id: SharedObjectId::new(),
207            message_type,
208            target_id: None,
209            data,
210            timestamp: chrono::Utc::now(),
211            signature: None,
212            hash: String::new(),
213        };
214        message.hash = message.calculate_hash();
215        message
216    }
217
218    /// Create a new message with a target
219    pub fn new_with_target(
220        message_type: MessageType,
221        target_id: SharedObjectId,
222        data: serde_json::Value,
223    ) -> Self {
224        let mut message = Self {
225            id: SharedObjectId::new(),
226            message_type,
227            target_id: Some(target_id),
228            data,
229            timestamp: chrono::Utc::now(),
230            signature: None,
231            hash: String::new(),
232        };
233        message.hash = message.calculate_hash();
234        message
235    }
236
237    /// Create a custom message
238    pub fn custom<T: Serialize>(message_type: impl Into<String>, data: T) -> Result<Self> {
239        let data = serde_json::to_value(data)
240            .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))?;
241        Ok(Self::new(MessageType::Custom(message_type.into()), data))
242    }
243
244    /// Sign this message with the given private key
245    pub fn sign(&mut self, private_key: &crate::crypto::PrivateKey) -> Result<()> {
246        let message_bytes = self.to_bytes()?;
247        let signature = private_key.sign(&message_bytes)?;
248        self.signature = Some(signature.to_bytes());
249        Ok(())
250    }
251
252    /// Verify the signature of this message
253    pub fn verify_signature(&self, public_key: &crate::crypto::PublicKey) -> Result<bool> {
254        if let Some(sig_bytes) = &self.signature {
255            // Create a copy without signature for verification
256            let mut message_copy = self.clone();
257            message_copy.signature = None;
258            let message_bytes = message_copy.to_bytes()?;
259
260            // Different approach for each key type to avoid the signature creation issues
261            match public_key {
262                crate::crypto::PublicKey::Ed25519(pk) => {
263                    if sig_bytes.len() != 64 {
264                        return Err(ChaincraftError::Crypto(CryptoError::InvalidSignature));
265                    }
266
267                    // Create the signature bytes array safely
268                    let mut sig_array = [0u8; 64];
269                    sig_array.copy_from_slice(&sig_bytes[0..64]);
270
271                    // In ed25519_dalek 2.0, from_bytes returns a Signature directly
272                    let signature = ed25519_dalek::Signature::from_bytes(&sig_array);
273
274                    use ed25519_dalek::Verifier;
275                    Ok(pk.verify(&message_bytes, &signature).is_ok())
276                },
277                crate::crypto::PublicKey::Secp256k1(pk) => {
278                    let sig_result = k256::ecdsa::Signature::from_slice(sig_bytes.as_slice());
279                    if sig_result.is_err() {
280                        return Err(ChaincraftError::Crypto(CryptoError::InvalidSignature));
281                    }
282
283                    use k256::ecdsa::{signature::Verifier, VerifyingKey};
284                    let verifying_key = VerifyingKey::from(pk);
285                    Ok(verifying_key
286                        .verify(&message_bytes, &sig_result.unwrap())
287                        .is_ok())
288                },
289            }
290        } else {
291            Ok(false)
292        }
293    }
294
295    /// Calculate the hash of this message
296    pub fn calculate_hash(&self) -> String {
297        let mut hasher = Sha256::new();
298        if let Ok(id_bytes) = bincode::serialize(self.id.as_uuid()) {
299            hasher.update(&id_bytes);
300        }
301        hasher.update(self.message_type.to_string().as_bytes());
302        hasher.update(self.data.to_string().as_bytes());
303        hasher.update(self.timestamp.to_rfc3339().as_bytes());
304        hex::encode(hasher.finalize())
305    }
306
307    /// Verify the message hash
308    pub fn verify_hash(&self) -> bool {
309        self.hash == self.calculate_hash()
310    }
311
312    /// Get the message size in bytes
313    pub fn size(&self) -> usize {
314        bincode::serialized_size(self).unwrap_or(0) as usize
315    }
316
317    /// Convert message to bytes for signing/verification
318    pub fn to_bytes(&self) -> Result<Vec<u8>> {
319        bincode::serialize(self)
320            .map_err(|e| ChaincraftError::Serialization(SerializationError::Binary(e)))
321    }
322
323    /// Create message from bytes
324    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
325        bincode::deserialize(bytes)
326            .map_err(|e| ChaincraftError::Serialization(SerializationError::Binary(e)))
327    }
328
329    /// Serialize to JSON
330    pub fn to_json(&self) -> Result<String> {
331        serde_json::to_string(self)
332            .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))
333    }
334
335    /// Deserialize from JSON
336    pub fn from_json(json: &str) -> Result<Self> {
337        serde_json::from_str(json)
338            .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))
339    }
340}
341
342impl PartialEq for SharedMessage {
343    fn eq(&self, other: &Self) -> bool {
344        self.id == other.id && self.hash == other.hash
345    }
346}
347
348impl Eq for SharedMessage {}
349
350/// Trait for objects that can be shared and synchronized across nodes
351#[async_trait]
352pub trait SharedObject: Send + Sync + Debug {
353    /// Get the unique identifier for this shared object
354    fn id(&self) -> SharedObjectId;
355
356    /// Get the type name of this shared object
357    fn type_name(&self) -> &'static str;
358
359    /// Validate if a message is valid for this shared object
360    async fn is_valid(&self, message: &SharedMessage) -> Result<bool>;
361
362    /// Process a validated message and update the object state
363    async fn add_message(&mut self, message: SharedMessage) -> Result<()>;
364
365    /// Check if this object supports merkleized synchronization
366    fn is_merkleized(&self) -> bool;
367
368    /// Get the latest state digest for synchronization
369    async fn get_latest_digest(&self) -> Result<String>;
370
371    /// Check if the object has a specific digest
372    async fn has_digest(&self, digest: &str) -> Result<bool>;
373
374    /// Validate if a digest is valid
375    async fn is_valid_digest(&self, digest: &str) -> Result<bool>;
376
377    /// Add a digest to the object
378    async fn add_digest(&mut self, digest: String) -> Result<bool>;
379
380    /// Get messages for gossip protocol
381    async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>>;
382
383    /// Get messages since a specific digest
384    async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>>;
385
386    /// Get the current state as a serializable value
387    async fn get_state(&self) -> Result<serde_json::Value>;
388
389    /// Reset the object to a clean state
390    async fn reset(&mut self) -> Result<()>;
391
392    /// Serialize the object to JSON
393    fn to_json(&self) -> Result<serde_json::Value>;
394
395    /// Update the object from JSON data
396    async fn apply_json(&mut self, data: serde_json::Value) -> Result<()>;
397
398    /// Clone the object as a trait object
399    fn clone_box(&self) -> Box<dyn SharedObject>;
400
401    /// Get a reference to self as Any for downcasting
402    fn as_any(&self) -> &dyn Any;
403
404    /// Get a mutable reference to self as Any for downcasting
405    fn as_any_mut(&mut self) -> &mut dyn Any;
406
407    /// Called when the object is accessed
408    async fn on_access(&mut self) -> Result<()>;
409
410    /// Called when the object is modified
411    async fn on_modify(&mut self) -> Result<()>;
412
413    /// Called when the object is deleted
414    async fn on_delete(&mut self) -> Result<()>;
415
416    /// Validate the object's current state
417    async fn validate(&self) -> Result<bool>;
418
419    /// Get object metadata
420    fn metadata(&self) -> HashMap<String, String>;
421}
422
423impl Clone for Box<dyn SharedObject> {
424    fn clone(&self) -> Self {
425        self.clone_box()
426    }
427}
428
429/// Digest for tracking shared object state
430#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
431pub struct StateDigest {
432    /// The digest hash
433    pub hash: String,
434    /// Timestamp when digest was created
435    pub timestamp: chrono::DateTime<chrono::Utc>,
436    /// Number of messages included in this digest
437    pub message_count: u64,
438}
439
440impl StateDigest {
441    /// Create a new state digest
442    pub fn new(hash: String, message_count: u64) -> Self {
443        Self {
444            hash,
445            timestamp: chrono::Utc::now(),
446            message_count,
447        }
448    }
449
450    /// Calculate a digest from messages
451    pub fn from_messages(messages: &[SharedMessage]) -> Self {
452        let mut hasher = Sha256::new();
453        for message in messages {
454            hasher.update(message.hash.as_bytes());
455        }
456        let hash = hex::encode(hasher.finalize());
457        Self::new(hash, messages.len() as u64)
458    }
459}
460
461/// Registry for managing shared objects
462pub struct SharedObjectRegistry {
463    objects: HashMap<SharedObjectId, Box<dyn SharedObject>>,
464    #[allow(dead_code)]
465    lock: Arc<RwLock<()>>,
466}
467
468impl Default for SharedObjectRegistry {
469    fn default() -> Self {
470        Self::new()
471    }
472}
473
474impl SharedObjectRegistry {
475    /// Create a new empty registry
476    pub fn new() -> Self {
477        Self {
478            objects: HashMap::new(),
479            lock: Arc::new(RwLock::new(())),
480        }
481    }
482
483    /// Register a shared object
484    pub fn register(&mut self, object: Box<dyn SharedObject>) -> SharedObjectId {
485        let id = object.id();
486        self.objects.insert(id.clone(), object);
487        id
488    }
489
490    /// Get a reference to an object by ID
491    pub fn get(&self, id: &SharedObjectId) -> Option<&dyn SharedObject> {
492        self.objects.get(id).map(|obj| obj.as_ref())
493    }
494
495    /// Remove an object from the registry
496    pub fn remove(&mut self, id: &SharedObjectId) -> Option<Box<dyn SharedObject>> {
497        self.objects.remove(id)
498    }
499
500    /// Get all object IDs
501    pub fn ids(&self) -> Vec<SharedObjectId> {
502        self.objects.keys().cloned().collect()
503    }
504
505    /// Get the number of registered objects
506    pub fn len(&self) -> usize {
507        self.objects.len()
508    }
509
510    /// Check if the registry is empty
511    pub fn is_empty(&self) -> bool {
512        self.objects.is_empty()
513    }
514
515    /// Clear all objects from the registry
516    pub fn clear(&mut self) {
517        self.objects.clear();
518    }
519
520    /// Get objects by type
521    pub fn get_by_type(&self, type_name: &str) -> Vec<&dyn SharedObject> {
522        self.objects
523            .values()
524            .filter(|obj| obj.type_name() == type_name)
525            .map(|obj| obj.as_ref())
526            .collect()
527    }
528
529    /// Check if an object exists
530    pub fn contains(&self, id: &SharedObjectId) -> bool {
531        self.objects.contains_key(id)
532    }
533}
534
535impl Debug for SharedObjectRegistry {
536    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537        f.debug_struct("SharedObjectRegistry")
538            .field("object_count", &self.objects.len())
539            .field("object_ids", &self.ids())
540            .finish()
541    }
542}