1pub use crate::shared::SharedObjectId;
4use crate::{
5 error::{ChaincraftError, Result},
6 shared::{MessageType, SharedMessage, SharedObject},
7};
8use async_trait::async_trait;
9use chrono;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use sha2::{Digest, Sha256};
13use std::any::Any;
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18#[async_trait]
20pub trait ApplicationObject: Send + Sync + std::fmt::Debug {
21 fn id(&self) -> &SharedObjectId;
23
24 fn type_name(&self) -> &'static str;
26
27 async fn is_valid(&self, message: &SharedMessage) -> Result<bool>;
29
30 async fn add_message(&mut self, message: SharedMessage) -> Result<()>;
32
33 fn is_merkleized(&self) -> bool;
35
36 async fn get_latest_digest(&self) -> Result<String>;
38
39 async fn has_digest(&self, digest: &str) -> Result<bool>;
41
42 async fn is_valid_digest(&self, digest: &str) -> Result<bool>;
44
45 async fn add_digest(&mut self, digest: String) -> Result<bool>;
47
48 async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>>;
50
51 async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>>;
53
54 async fn get_state(&self) -> Result<Value>;
56
57 async fn reset(&mut self) -> Result<()>;
59
60 fn clone_box(&self) -> Box<dyn ApplicationObject>;
62
63 fn as_any(&self) -> &dyn Any;
65
66 fn as_any_mut(&mut self) -> &mut dyn Any;
68}
69
70#[derive(Debug, Clone)]
72pub struct SimpleSharedNumber {
73 id: SharedObjectId,
74 number: i64,
75 created_at: chrono::DateTime<chrono::Utc>,
76 updated_at: chrono::DateTime<chrono::Utc>,
77 locked: bool,
78 messages: Vec<SharedMessage>,
79 seen_hashes: HashSet<String>,
80 digests: Vec<String>,
81}
82
83impl SimpleSharedNumber {
84 pub fn new() -> Self {
85 Self {
86 id: SharedObjectId::new(),
87 number: 0,
88 created_at: chrono::Utc::now(),
89 updated_at: chrono::Utc::now(),
90 locked: false,
91 messages: Vec::new(),
92 seen_hashes: HashSet::new(),
93 digests: Vec::new(),
94 }
95 }
96
97 pub fn get_number(&self) -> i64 {
98 self.number
99 }
100
101 pub fn get_messages(&self) -> &[SharedMessage] {
102 &self.messages
103 }
104
105 fn calculate_message_hash(data: &Value) -> String {
106 let data_str = serde_json::to_string(&serde_json::json!({
107 "content": data
108 }))
109 .unwrap_or_default();
110 let mut hasher = Sha256::new();
111 hasher.update(data_str.as_bytes());
112 hex::encode(hasher.finalize())
113 }
114}
115
116impl Default for SimpleSharedNumber {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122#[async_trait]
123impl ApplicationObject for SimpleSharedNumber {
124 fn id(&self) -> &SharedObjectId {
125 &self.id
126 }
127
128 fn type_name(&self) -> &'static str {
129 "SimpleSharedNumber"
130 }
131
132 async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
133 Ok(message.data.is_i64())
135 }
136
137 async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
138 let msg_hash = Self::calculate_message_hash(&message.data);
140
141 if self.seen_hashes.contains(&msg_hash) {
142 return Ok(());
144 }
145
146 self.seen_hashes.insert(msg_hash);
147
148 if let Some(value) = message.data.as_i64() {
150 self.number += value;
151 self.messages.push(message);
152 tracing::info!("SimpleSharedNumber: Added message with data: {}", value);
153 }
154
155 Ok(())
156 }
157
158 fn is_merkleized(&self) -> bool {
159 false
160 }
161
162 async fn get_latest_digest(&self) -> Result<String> {
163 Ok(self.number.to_string())
164 }
165
166 async fn has_digest(&self, digest: &str) -> Result<bool> {
167 Ok(self.digests.contains(&digest.to_string()))
168 }
169
170 async fn is_valid_digest(&self, _digest: &str) -> Result<bool> {
171 Ok(true)
172 }
173
174 async fn add_digest(&mut self, digest: String) -> Result<bool> {
175 self.digests.push(digest);
176 Ok(true)
177 }
178
179 async fn gossip_messages(&self, _digest: Option<&str>) -> Result<Vec<SharedMessage>> {
180 Ok(Vec::new())
181 }
182
183 async fn get_messages_since_digest(&self, _digest: &str) -> Result<Vec<SharedMessage>> {
184 Ok(Vec::new())
185 }
186
187 async fn get_state(&self) -> Result<Value> {
188 Ok(serde_json::json!({
189 "number": self.number,
190 "message_count": self.messages.len(),
191 "seen_hashes_count": self.seen_hashes.len()
192 }))
193 }
194
195 async fn reset(&mut self) -> Result<()> {
196 self.number = 0;
197 self.messages.clear();
198 self.seen_hashes.clear();
199 self.digests.clear();
200 Ok(())
201 }
202
203 fn clone_box(&self) -> Box<dyn ApplicationObject> {
204 Box::new(self.clone())
205 }
206
207 fn as_any(&self) -> &dyn Any {
208 self
209 }
210
211 fn as_any_mut(&mut self) -> &mut dyn Any {
212 self
213 }
214}
215
216#[derive(Debug, Clone)]
222pub struct MerkelizedChain {
223 id: SharedObjectId,
224 chain: Vec<String>,
226 messages: Vec<SharedMessage>,
228 hash_set: HashSet<String>,
230 created_at: chrono::DateTime<chrono::Utc>,
231}
232
233impl MerkelizedChain {
234 pub fn new() -> Self {
236 let genesis = Self::calculate_hash("genesis");
237 Self {
238 id: SharedObjectId::new(),
239 chain: vec![genesis.clone()],
240 messages: vec![SharedMessage::new(
241 MessageType::Custom("genesis".to_string()),
242 serde_json::json!("genesis"),
243 )],
244 hash_set: {
245 let mut set = HashSet::new();
246 set.insert(genesis);
247 set
248 },
249 created_at: chrono::Utc::now(),
250 }
251 }
252
253 pub fn calculate_hash(data: &str) -> String {
255 let mut hasher = Sha256::new();
256 hasher.update(data.as_bytes());
257 hex::encode(hasher.finalize())
258 }
259
260 pub fn calculate_next_hash(prev_hash: &str) -> String {
262 Self::calculate_hash(prev_hash)
263 }
264
265 pub fn chain_length(&self) -> usize {
267 self.chain.len()
268 }
269
270 pub fn genesis_hash(&self) -> &str {
272 &self.chain[0]
273 }
274
275 pub fn latest_hash(&self) -> &str {
277 self.chain.last().expect("chain is never empty")
278 }
279
280 pub fn hash_at(&self, index: usize) -> Option<&str> {
282 self.chain.get(index).map(|s| s.as_str())
283 }
284
285 pub fn is_valid_next_hash(&self, hash: &str) -> bool {
287 let expected = Self::calculate_next_hash(self.latest_hash());
288 hash == expected
289 }
290
291 pub fn add_next_hash(&mut self) -> String {
293 let next_hash = Self::calculate_next_hash(self.latest_hash());
294 self.chain.push(next_hash.clone());
295 self.hash_set.insert(next_hash.clone());
296
297 let msg = SharedMessage::new(
299 MessageType::Custom("chain_update".to_string()),
300 serde_json::json!(next_hash),
301 );
302 self.messages.push(msg);
303
304 next_hash
305 }
306
307 pub fn try_add_hash(&mut self, hash: &str) -> bool {
310 if self.hash_set.contains(hash) {
312 return false;
313 }
314
315 for i in 0..self.chain.len() {
317 let expected_next = Self::calculate_next_hash(&self.chain[i]);
318 if hash == expected_next {
319 self.chain.push(hash.to_string());
320 self.hash_set.insert(hash.to_string());
321
322 let msg = SharedMessage::new(
323 MessageType::Custom("chain_update".to_string()),
324 serde_json::json!(hash),
325 );
326 self.messages.push(msg);
327
328 return true;
329 }
330 }
331
332 false
333 }
334
335 pub fn chain(&self) -> &[String] {
337 &self.chain
338 }
339
340 pub fn find_hash_index(&self, hash: &str) -> Option<usize> {
342 self.chain.iter().position(|h| h == hash)
343 }
344}
345
346impl Default for MerkelizedChain {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352#[async_trait]
353impl ApplicationObject for MerkelizedChain {
354 fn id(&self) -> &SharedObjectId {
355 &self.id
356 }
357
358 fn type_name(&self) -> &'static str {
359 "MerkelizedChain"
360 }
361
362 async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
363 let Some(hash) = message.data.as_str() else {
365 return Ok(false);
366 };
367
368 if self.hash_set.contains(hash) {
370 return Ok(true);
371 }
372
373 for i in 0..self.chain.len() {
375 let expected_next = Self::calculate_next_hash(&self.chain[i]);
376 if hash == expected_next {
377 return Ok(true);
378 }
379 }
380
381 Ok(false)
382 }
383
384 async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
385 let Some(hash) = message.data.as_str() else {
386 return Ok(());
387 };
388
389 if self.hash_set.contains(hash) {
391 return Ok(());
392 }
393
394 if self.try_add_hash(hash) {
396 tracing::info!("MerkelizedChain: Added hash {} to chain (length: {})",
397 &hash[..8.min(hash.len())], self.chain.len());
398 }
399
400 Ok(())
401 }
402
403 fn is_merkleized(&self) -> bool {
404 true
405 }
406
407 async fn get_latest_digest(&self) -> Result<String> {
408 Ok(self.latest_hash().to_string())
409 }
410
411 async fn has_digest(&self, digest: &str) -> Result<bool> {
412 Ok(self.hash_set.contains(digest))
413 }
414
415 async fn is_valid_digest(&self, digest: &str) -> Result<bool> {
416 Ok(self.hash_set.contains(digest) || self.is_valid_next_hash(digest))
417 }
418
419 async fn add_digest(&mut self, digest: String) -> Result<bool> {
420 if self.try_add_hash(&digest) {
421 Ok(true)
422 } else {
423 Ok(false)
424 }
425 }
426
427 async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>> {
428 let start_index = match digest {
429 Some(hash) => {
430 match self.find_hash_index(hash) {
431 Some(idx) => idx + 1, None => return Ok(Vec::new()), }
434 }
435 None => 1, };
437
438 let mut result = Vec::new();
439 for i in start_index..self.chain.len() {
440 let msg = SharedMessage::new(
441 MessageType::Custom("chain_update".to_string()),
442 serde_json::json!(self.chain[i]),
443 );
444 result.push(msg);
445 }
446
447 Ok(result)
448 }
449
450 async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>> {
451 self.gossip_messages(Some(digest)).await
452 }
453
454 async fn get_state(&self) -> Result<Value> {
455 Ok(serde_json::json!({
456 "chain_length": self.chain.len(),
457 "latest_hash": self.latest_hash(),
458 "genesis_hash": self.genesis_hash(),
459 }))
460 }
461
462 async fn reset(&mut self) -> Result<()> {
463 let genesis = Self::calculate_hash("genesis");
464 self.chain = vec![genesis.clone()];
465 self.hash_set = {
466 let mut set = HashSet::new();
467 set.insert(genesis);
468 set
469 };
470 self.messages = vec![SharedMessage::new(
471 MessageType::Custom("genesis".to_string()),
472 serde_json::json!("genesis"),
473 )];
474 Ok(())
475 }
476
477 fn clone_box(&self) -> Box<dyn ApplicationObject> {
478 Box::new(self.clone())
479 }
480
481 fn as_any(&self) -> &dyn Any {
482 self
483 }
484
485 fn as_any_mut(&mut self) -> &mut dyn Any {
486 self
487 }
488}
489
490#[derive(Debug, Clone)]
494pub struct MessageChain {
495 id: SharedObjectId,
496 messages: Vec<SharedMessage>,
497 seen_hashes: HashSet<String>,
498 digests: Vec<String>,
499}
500
501impl MessageChain {
502 pub fn new() -> Self {
503 Self {
504 id: SharedObjectId::new(),
505 messages: Vec::new(),
506 seen_hashes: HashSet::new(),
507 digests: Vec::new(),
508 }
509 }
510
511 pub fn len(&self) -> usize {
512 self.messages.len()
513 }
514
515 pub fn messages(&self) -> &[SharedMessage] {
516 &self.messages
517 }
518
519 fn msg_hash(msg: &SharedMessage) -> String {
520 msg.hash.clone()
521 }
522}
523
524impl Default for MessageChain {
525 fn default() -> Self {
526 Self::new()
527 }
528}
529
530#[async_trait]
531impl ApplicationObject for MessageChain {
532 fn id(&self) -> &SharedObjectId {
533 &self.id
534 }
535
536 fn type_name(&self) -> &'static str {
537 "MessageChain"
538 }
539
540 async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
541 Ok(!message.data.is_null())
542 }
543
544 async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
545 let h = Self::msg_hash(&message);
546 if self.seen_hashes.contains(&h) {
547 return Ok(());
548 }
549 self.seen_hashes.insert(h);
550 self.messages.push(message);
551 Ok(())
552 }
553
554 fn is_merkleized(&self) -> bool {
555 true
556 }
557
558 async fn get_latest_digest(&self) -> Result<String> {
559 Ok(self
560 .messages
561 .last()
562 .map(|m| m.hash.clone())
563 .unwrap_or_else(|| "genesis".to_string()))
564 }
565
566 async fn has_digest(&self, digest: &str) -> Result<bool> {
567 Ok(self.digests.contains(&digest.to_string())
568 || self.messages.iter().any(|m| m.hash == digest))
569 }
570
571 async fn is_valid_digest(&self, digest: &str) -> Result<bool> {
572 Ok(self.has_digest(digest).await?
573 || digest == "genesis"
574 || self.seen_hashes.contains(digest))
575 }
576
577 async fn add_digest(&mut self, _digest: String) -> Result<bool> {
578 Ok(false)
579 }
580
581 async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>> {
582 let start = match digest {
583 Some(d) if d != "genesis" => {
584 self.messages
585 .iter()
586 .position(|m| m.hash == d)
587 .map(|i| i + 1)
588 .unwrap_or(0)
589 }
590 _ => 0,
591 };
592 Ok(self.messages[start..].to_vec())
593 }
594
595 async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>> {
596 self.gossip_messages(Some(digest)).await
597 }
598
599 async fn get_state(&self) -> Result<Value> {
600 Ok(serde_json::json!({
601 "length": self.messages.len(),
602 "message_count": self.messages.len()
603 }))
604 }
605
606 async fn reset(&mut self) -> Result<()> {
607 self.messages.clear();
608 self.seen_hashes.clear();
609 self.digests.clear();
610 Ok(())
611 }
612
613 fn clone_box(&self) -> Box<dyn ApplicationObject> {
614 Box::new(self.clone())
615 }
616
617 fn as_any(&self) -> &dyn Any {
618 self
619 }
620
621 fn as_any_mut(&mut self) -> &mut dyn Any {
622 self
623 }
624}
625
626#[derive(Debug)]
628pub struct ApplicationObjectRegistry {
629 objects: HashMap<SharedObjectId, Box<dyn ApplicationObject>>,
630 objects_by_type: HashMap<String, Vec<SharedObjectId>>,
631}
632
633impl ApplicationObjectRegistry {
634 pub fn new() -> Self {
635 Self {
636 objects: HashMap::new(),
637 objects_by_type: HashMap::new(),
638 }
639 }
640
641 pub fn register(&mut self, object: Box<dyn ApplicationObject>) -> SharedObjectId {
643 let id = object.id().clone();
644 let type_name = object.type_name().to_string();
645
646 self.objects_by_type
647 .entry(type_name)
648 .or_default()
649 .push(id.clone());
650
651 self.objects.insert(id.clone(), object);
652 id
653 }
654
655 pub fn get(&self, id: &SharedObjectId) -> Option<&dyn ApplicationObject> {
657 self.objects.get(id).map(|obj| obj.as_ref())
658 }
659
660 pub fn get_by_type(&self, type_name: &str) -> Vec<Box<dyn ApplicationObject>> {
662 self.objects_by_type
663 .get(type_name)
664 .map(|ids| {
665 ids.iter()
666 .filter_map(|id| self.objects.get(id))
667 .map(|obj| obj.clone_box())
668 .collect()
669 })
670 .unwrap_or_default()
671 }
672
673 pub fn remove(&mut self, id: &SharedObjectId) -> Option<Box<dyn ApplicationObject>> {
675 if let Some(object) = self.objects.remove(id) {
676 let type_name = object.type_name().to_string();
677 if let Some(type_list) = self.objects_by_type.get_mut(&type_name) {
678 type_list.retain(|obj_id| obj_id != id);
679 if type_list.is_empty() {
680 self.objects_by_type.remove(&type_name);
681 }
682 }
683 Some(object)
684 } else {
685 None
686 }
687 }
688
689 pub fn ids(&self) -> Vec<SharedObjectId> {
691 self.objects.keys().cloned().collect()
692 }
693
694 pub fn len(&self) -> usize {
696 self.objects.len()
697 }
698
699 pub fn is_empty(&self) -> bool {
701 self.objects.is_empty()
702 }
703
704 pub fn clear(&mut self) {
706 self.objects.clear();
707 self.objects_by_type.clear();
708 }
709
710 pub async fn process_message(&mut self, message: SharedMessage) -> Result<Vec<SharedObjectId>> {
712 let mut processed_objects = Vec::new();
713
714 let ids: Vec<SharedObjectId> = self.objects.keys().cloned().collect();
716
717 for id in ids {
719 let is_valid = if let Some(object) = self.objects.get(&id) {
721 object.is_valid(&message).await?
722 } else {
723 false
724 };
725
726 if is_valid {
728 if let Some(object) = self.objects.get_mut(&id) {
729 object.add_message(message.clone()).await?;
730 processed_objects.push(id);
731 }
732 }
733 }
734
735 Ok(processed_objects)
736 }
737}
738
739impl Default for ApplicationObjectRegistry {
740 fn default() -> Self {
741 Self::new()
742 }
743}