1use crate::error::{ChaincraftError, CryptoError, Result, SerializationError};
4use async_trait::async_trait;
5use bincode;
6use hex;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use std::any::Any;
10use std::collections::HashMap;
11use std::fmt::{self, Debug};
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use uuid::Uuid;
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct SharedObjectId(Uuid);
19
20impl SharedObjectId {
21 pub fn new() -> Self {
22 Self(Uuid::new_v4())
23 }
24
25 pub fn as_uuid(&self) -> &Uuid {
26 &self.0
27 }
28
29 pub fn into_uuid(self) -> Uuid {
30 self.0
31 }
32
33 pub fn from_uuid(uuid: Uuid) -> Self {
34 Self(uuid)
35 }
36}
37
38impl fmt::Display for SharedObjectId {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 write!(f, "{}", self.0)
41 }
42}
43
44impl Default for SharedObjectId {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum MessageType {
53 PeerDiscovery,
55 RequestLocalPeers,
57 LocalPeers,
59 RequestSharedObjectUpdate,
61 SharedObjectUpdate,
63 Get,
65 Set,
67 Delete,
69 Response,
71 Notification,
73 Heartbeat,
75 Error,
77 RequestDigest,
79 RequestMessagesSince,
81 DigestResponse,
83 MessagesResponse,
85 Custom(String),
87}
88
89impl Serialize for MessageType {
90 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
91 where
92 S: serde::Serializer,
93 {
94 match self {
95 MessageType::Custom(name) => {
96 #[derive(Serialize)]
97 struct Custom {
98 #[serde(rename = "Custom")]
99 custom: String,
100 }
101 Custom {
102 custom: name.clone(),
103 }
104 .serialize(serializer)
105 },
106 _ => serializer.serialize_str(&self.to_string()),
107 }
108 }
109}
110
111impl<'de> Deserialize<'de> for MessageType {
112 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
113 where
114 D: serde::Deserializer<'de>,
115 {
116 use serde::de::{Error, Visitor};
117 use std::fmt;
118
119 struct MessageTypeVisitor;
120
121 impl<'de> Visitor<'de> for MessageTypeVisitor {
122 type Value = MessageType;
123
124 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
125 formatter.write_str("a message type string or Custom object")
126 }
127
128 fn visit_str<E>(self, value: &str) -> std::result::Result<MessageType, E>
129 where
130 E: Error,
131 {
132 match value {
133 "PEER_DISCOVERY" => Ok(MessageType::PeerDiscovery),
134 "REQUEST_LOCAL_PEERS" => Ok(MessageType::RequestLocalPeers),
135 "LOCAL_PEERS" => Ok(MessageType::LocalPeers),
136 "REQUEST_SHARED_OBJECT_UPDATE" => Ok(MessageType::RequestSharedObjectUpdate),
137 "SHARED_OBJECT_UPDATE" => Ok(MessageType::SharedObjectUpdate),
138 "GET" => Ok(MessageType::Get),
139 "SET" => Ok(MessageType::Set),
140 "DELETE" => Ok(MessageType::Delete),
141 "RESPONSE" => Ok(MessageType::Response),
142 "NOTIFICATION" => Ok(MessageType::Notification),
143 "HEARTBEAT" => Ok(MessageType::Heartbeat),
144 "ERROR" => Ok(MessageType::Error),
145 "REQUEST_DIGEST" => Ok(MessageType::RequestDigest),
146 "REQUEST_MESSAGES_SINCE" => Ok(MessageType::RequestMessagesSince),
147 "DIGEST_RESPONSE" => Ok(MessageType::DigestResponse),
148 "MESSAGES_RESPONSE" => Ok(MessageType::MessagesResponse),
149 _ => Ok(MessageType::Custom(value.to_string())),
150 }
151 }
152
153 fn visit_map<A>(self, mut map: A) -> std::result::Result<MessageType, A::Error>
154 where
155 A: serde::de::MapAccess<'de>,
156 {
157 if let Some(key) = map.next_key::<String>()? {
158 if key == "Custom" {
159 let value: String = map.next_value()?;
160 Ok(MessageType::Custom(value))
161 } else {
162 Err(Error::unknown_field(&key, &["Custom"]))
163 }
164 } else {
165 Err(Error::missing_field("Custom"))
166 }
167 }
168 }
169
170 deserializer.deserialize_any(MessageTypeVisitor)
171 }
172}
173
174impl fmt::Display for MessageType {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 match self {
177 MessageType::PeerDiscovery => write!(f, "PEER_DISCOVERY"),
178 MessageType::RequestLocalPeers => write!(f, "REQUEST_LOCAL_PEERS"),
179 MessageType::LocalPeers => write!(f, "LOCAL_PEERS"),
180 MessageType::RequestSharedObjectUpdate => write!(f, "REQUEST_SHARED_OBJECT_UPDATE"),
181 MessageType::SharedObjectUpdate => write!(f, "SHARED_OBJECT_UPDATE"),
182 MessageType::Get => write!(f, "GET"),
183 MessageType::Set => write!(f, "SET"),
184 MessageType::Delete => write!(f, "DELETE"),
185 MessageType::Response => write!(f, "RESPONSE"),
186 MessageType::Notification => write!(f, "NOTIFICATION"),
187 MessageType::Heartbeat => write!(f, "HEARTBEAT"),
188 MessageType::Error => write!(f, "ERROR"),
189 MessageType::RequestDigest => write!(f, "REQUEST_DIGEST"),
190 MessageType::RequestMessagesSince => write!(f, "REQUEST_MESSAGES_SINCE"),
191 MessageType::DigestResponse => write!(f, "DIGEST_RESPONSE"),
192 MessageType::MessagesResponse => write!(f, "MESSAGES_RESPONSE"),
193 MessageType::Custom(name) => write!(f, "{}", name),
194 }
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct SharedMessage {
201 pub id: SharedObjectId,
203 pub message_type: MessageType,
205 pub target_id: Option<SharedObjectId>,
207 pub data: serde_json::Value,
209 pub timestamp: chrono::DateTime<chrono::Utc>,
211 #[serde(with = "serde_bytes", default)]
213 pub signature: Option<Vec<u8>>,
214 pub hash: String,
216}
217
218impl SharedMessage {
219 pub fn new(message_type: MessageType, data: serde_json::Value) -> Self {
221 let mut message = Self {
222 id: SharedObjectId::new(),
223 message_type,
224 target_id: None,
225 data,
226 timestamp: chrono::Utc::now(),
227 signature: None,
228 hash: String::new(),
229 };
230 message.hash = message.calculate_hash();
231 message
232 }
233
234 pub fn new_with_target(
236 message_type: MessageType,
237 target_id: SharedObjectId,
238 data: serde_json::Value,
239 ) -> Self {
240 let mut message = Self {
241 id: SharedObjectId::new(),
242 message_type,
243 target_id: Some(target_id),
244 data,
245 timestamp: chrono::Utc::now(),
246 signature: None,
247 hash: String::new(),
248 };
249 message.hash = message.calculate_hash();
250 message
251 }
252
253 pub fn custom<T: Serialize>(message_type: impl Into<String>, data: T) -> Result<Self> {
255 let data = serde_json::to_value(data)
256 .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))?;
257 Ok(Self::new(MessageType::Custom(message_type.into()), data))
258 }
259
260 pub fn sign(&mut self, private_key: &crate::crypto::PrivateKey) -> Result<()> {
262 let message_bytes = self.to_bytes()?;
263 let signature = private_key.sign(&message_bytes)?;
264 self.signature = Some(signature.to_bytes());
265 Ok(())
266 }
267
268 pub fn verify_signature(&self, public_key: &crate::crypto::PublicKey) -> Result<bool> {
270 if let Some(sig_bytes) = &self.signature {
271 let mut message_copy = self.clone();
273 message_copy.signature = None;
274 let message_bytes = message_copy.to_bytes()?;
275
276 match public_key {
278 crate::crypto::PublicKey::Ed25519(pk) => {
279 if sig_bytes.len() != 64 {
280 return Err(ChaincraftError::Crypto(CryptoError::InvalidSignature));
281 }
282
283 let mut sig_array = [0u8; 64];
285 sig_array.copy_from_slice(&sig_bytes[0..64]);
286
287 let signature = ed25519_dalek::Signature::from_bytes(&sig_array);
289
290 use ed25519_dalek::Verifier;
291 Ok(pk.verify(&message_bytes, &signature).is_ok())
292 },
293 crate::crypto::PublicKey::Secp256k1(pk) => {
294 let sig_result = k256::ecdsa::Signature::from_slice(sig_bytes.as_slice());
295 if sig_result.is_err() {
296 return Err(ChaincraftError::Crypto(CryptoError::InvalidSignature));
297 }
298
299 use k256::ecdsa::{signature::Verifier, VerifyingKey};
300 let verifying_key = VerifyingKey::from(pk);
301 Ok(verifying_key
302 .verify(&message_bytes, &sig_result.unwrap())
303 .is_ok())
304 },
305 }
306 } else {
307 Ok(false)
308 }
309 }
310
311 pub fn calculate_hash(&self) -> String {
313 let mut hasher = Sha256::new();
314 if let Ok(id_bytes) = bincode::serialize(self.id.as_uuid()) {
315 hasher.update(&id_bytes);
316 }
317 hasher.update(self.message_type.to_string().as_bytes());
318 hasher.update(self.data.to_string().as_bytes());
319 hasher.update(self.timestamp.to_rfc3339().as_bytes());
320 hex::encode(hasher.finalize())
321 }
322
323 pub fn verify_hash(&self) -> bool {
325 self.hash == self.calculate_hash()
326 }
327
328 pub fn size(&self) -> usize {
330 bincode::serialized_size(self).unwrap_or(0) as usize
331 }
332
333 pub fn to_bytes(&self) -> Result<Vec<u8>> {
335 bincode::serialize(self)
336 .map_err(|e| ChaincraftError::Serialization(SerializationError::Binary(e)))
337 }
338
339 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
341 bincode::deserialize(bytes)
342 .map_err(|e| ChaincraftError::Serialization(SerializationError::Binary(e)))
343 }
344
345 pub fn to_json(&self) -> Result<String> {
347 serde_json::to_string(self)
348 .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))
349 }
350
351 pub fn from_json(json: &str) -> Result<Self> {
353 serde_json::from_str(json)
354 .map_err(|e| ChaincraftError::Serialization(SerializationError::Json(e)))
355 }
356}
357
358impl PartialEq for SharedMessage {
359 fn eq(&self, other: &Self) -> bool {
360 self.id == other.id && self.hash == other.hash
361 }
362}
363
364impl Eq for SharedMessage {}
365
366#[async_trait]
368pub trait SharedObject: Send + Sync + Debug {
369 fn id(&self) -> SharedObjectId;
371
372 fn type_name(&self) -> &'static str;
374
375 async fn is_valid(&self, message: &SharedMessage) -> Result<bool>;
377
378 async fn add_message(&mut self, message: SharedMessage) -> Result<()>;
380
381 fn is_merkleized(&self) -> bool;
383
384 async fn get_latest_digest(&self) -> Result<String>;
386
387 async fn has_digest(&self, digest: &str) -> Result<bool>;
389
390 async fn is_valid_digest(&self, digest: &str) -> Result<bool>;
392
393 async fn add_digest(&mut self, digest: String) -> Result<bool>;
395
396 async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>>;
398
399 async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>>;
401
402 async fn get_state(&self) -> Result<serde_json::Value>;
404
405 async fn reset(&mut self) -> Result<()>;
407
408 fn to_json(&self) -> Result<serde_json::Value>;
410
411 async fn apply_json(&mut self, data: serde_json::Value) -> Result<()>;
413
414 fn clone_box(&self) -> Box<dyn SharedObject>;
416
417 fn as_any(&self) -> &dyn Any;
419
420 fn as_any_mut(&mut self) -> &mut dyn Any;
422
423 async fn on_access(&mut self) -> Result<()>;
425
426 async fn on_modify(&mut self) -> Result<()>;
428
429 async fn on_delete(&mut self) -> Result<()>;
431
432 async fn validate(&self) -> Result<bool>;
434
435 fn metadata(&self) -> HashMap<String, String>;
437}
438
439impl Clone for Box<dyn SharedObject> {
440 fn clone(&self) -> Self {
441 self.clone_box()
442 }
443}
444
445#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
447pub struct StateDigest {
448 pub hash: String,
450 pub timestamp: chrono::DateTime<chrono::Utc>,
452 pub message_count: u64,
454}
455
456impl StateDigest {
457 pub fn new(hash: String, message_count: u64) -> Self {
459 Self {
460 hash,
461 timestamp: chrono::Utc::now(),
462 message_count,
463 }
464 }
465
466 pub fn from_messages(messages: &[SharedMessage]) -> Self {
468 let mut hasher = Sha256::new();
469 for message in messages {
470 hasher.update(message.hash.as_bytes());
471 }
472 let hash = hex::encode(hasher.finalize());
473 Self::new(hash, messages.len() as u64)
474 }
475}
476
477pub struct SharedObjectRegistry {
479 objects: HashMap<SharedObjectId, Box<dyn SharedObject>>,
480 #[allow(dead_code)]
481 lock: Arc<RwLock<()>>,
482}
483
484impl Default for SharedObjectRegistry {
485 fn default() -> Self {
486 Self::new()
487 }
488}
489
490impl SharedObjectRegistry {
491 pub fn new() -> Self {
493 Self {
494 objects: HashMap::new(),
495 lock: Arc::new(RwLock::new(())),
496 }
497 }
498
499 pub fn register(&mut self, object: Box<dyn SharedObject>) -> SharedObjectId {
501 let id = object.id();
502 self.objects.insert(id.clone(), object);
503 id
504 }
505
506 pub fn get(&self, id: &SharedObjectId) -> Option<&dyn SharedObject> {
508 self.objects.get(id).map(|obj| obj.as_ref())
509 }
510
511 pub fn remove(&mut self, id: &SharedObjectId) -> Option<Box<dyn SharedObject>> {
513 self.objects.remove(id)
514 }
515
516 pub fn ids(&self) -> Vec<SharedObjectId> {
518 self.objects.keys().cloned().collect()
519 }
520
521 pub fn len(&self) -> usize {
523 self.objects.len()
524 }
525
526 pub fn is_empty(&self) -> bool {
528 self.objects.is_empty()
529 }
530
531 pub fn clear(&mut self) {
533 self.objects.clear();
534 }
535
536 pub fn get_by_type(&self, type_name: &str) -> Vec<&dyn SharedObject> {
538 self.objects
539 .values()
540 .filter(|obj| obj.type_name() == type_name)
541 .map(|obj| obj.as_ref())
542 .collect()
543 }
544
545 pub fn contains(&self, id: &SharedObjectId) -> bool {
547 self.objects.contains_key(id)
548 }
549}
550
551impl Debug for SharedObjectRegistry {
552 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
553 f.debug_struct("SharedObjectRegistry")
554 .field("object_count", &self.objects.len())
555 .field("object_ids", &self.ids())
556 .finish()
557 }
558}