pub use crate::shared::SharedObjectId;
use crate::{
error::{ChaincraftError, Result},
shared::{MessageType, SharedMessage, SharedObject},
};
use async_trait::async_trait;
use chrono;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::{Digest, Sha256};
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
#[async_trait]
pub trait ApplicationObject: Send + Sync + std::fmt::Debug {
fn id(&self) -> &SharedObjectId;
fn type_name(&self) -> &'static str;
async fn is_valid(&self, message: &SharedMessage) -> Result<bool>;
async fn add_message(&mut self, message: SharedMessage) -> Result<()>;
fn is_merkleized(&self) -> bool;
async fn get_latest_digest(&self) -> Result<String>;
async fn has_digest(&self, digest: &str) -> Result<bool>;
async fn is_valid_digest(&self, digest: &str) -> Result<bool>;
async fn add_digest(&mut self, digest: String) -> Result<bool>;
async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>>;
async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>>;
async fn get_state(&self) -> Result<Value>;
async fn reset(&mut self) -> Result<()>;
fn clone_box(&self) -> Box<dyn ApplicationObject>;
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
#[derive(Debug, Clone)]
pub struct SimpleSharedNumber {
id: SharedObjectId,
number: i64,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
locked: bool,
messages: Vec<SharedMessage>,
seen_hashes: HashSet<String>,
digests: Vec<String>,
}
impl SimpleSharedNumber {
pub fn new() -> Self {
Self {
id: SharedObjectId::new(),
number: 0,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
locked: false,
messages: Vec::new(),
seen_hashes: HashSet::new(),
digests: Vec::new(),
}
}
pub fn get_number(&self) -> i64 {
self.number
}
pub fn get_messages(&self) -> &[SharedMessage] {
&self.messages
}
fn calculate_message_hash(data: &Value) -> String {
let data_str = serde_json::to_string(&serde_json::json!({
"content": data
}))
.unwrap_or_default();
let mut hasher = Sha256::new();
hasher.update(data_str.as_bytes());
hex::encode(hasher.finalize())
}
}
impl Default for SimpleSharedNumber {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ApplicationObject for SimpleSharedNumber {
fn id(&self) -> &SharedObjectId {
&self.id
}
fn type_name(&self) -> &'static str {
"SimpleSharedNumber"
}
async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
Ok(message.data.is_i64())
}
async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
let msg_hash = Self::calculate_message_hash(&message.data);
if self.seen_hashes.contains(&msg_hash) {
return Ok(());
}
self.seen_hashes.insert(msg_hash);
if let Some(value) = message.data.as_i64() {
self.number += value;
self.messages.push(message);
tracing::info!("SimpleSharedNumber: Added message with data: {}", value);
}
Ok(())
}
fn is_merkleized(&self) -> bool {
false
}
async fn get_latest_digest(&self) -> Result<String> {
Ok(self.number.to_string())
}
async fn has_digest(&self, digest: &str) -> Result<bool> {
Ok(self.digests.contains(&digest.to_string()))
}
async fn is_valid_digest(&self, _digest: &str) -> Result<bool> {
Ok(true)
}
async fn add_digest(&mut self, digest: String) -> Result<bool> {
self.digests.push(digest);
Ok(true)
}
async fn gossip_messages(&self, _digest: Option<&str>) -> Result<Vec<SharedMessage>> {
Ok(Vec::new())
}
async fn get_messages_since_digest(&self, _digest: &str) -> Result<Vec<SharedMessage>> {
Ok(Vec::new())
}
async fn get_state(&self) -> Result<Value> {
Ok(serde_json::json!({
"number": self.number,
"message_count": self.messages.len(),
"seen_hashes_count": self.seen_hashes.len()
}))
}
async fn reset(&mut self) -> Result<()> {
self.number = 0;
self.messages.clear();
self.seen_hashes.clear();
self.digests.clear();
Ok(())
}
fn clone_box(&self) -> Box<dyn ApplicationObject> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
#[derive(Debug, Clone)]
pub struct MerkelizedChain {
id: SharedObjectId,
chain: Vec<String>,
messages: Vec<SharedMessage>,
hash_set: HashSet<String>,
created_at: chrono::DateTime<chrono::Utc>,
}
impl MerkelizedChain {
pub fn new() -> Self {
let genesis = Self::calculate_hash("genesis");
Self {
id: SharedObjectId::new(),
chain: vec![genesis.clone()],
messages: vec![SharedMessage::new(
MessageType::Custom("genesis".to_string()),
serde_json::json!("genesis"),
)],
hash_set: {
let mut set = HashSet::new();
set.insert(genesis);
set
},
created_at: chrono::Utc::now(),
}
}
pub fn calculate_hash(data: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(data.as_bytes());
hex::encode(hasher.finalize())
}
pub fn calculate_next_hash(prev_hash: &str) -> String {
Self::calculate_hash(prev_hash)
}
pub fn chain_length(&self) -> usize {
self.chain.len()
}
pub fn genesis_hash(&self) -> &str {
&self.chain[0]
}
pub fn latest_hash(&self) -> &str {
self.chain.last().expect("chain is never empty")
}
pub fn hash_at(&self, index: usize) -> Option<&str> {
self.chain.get(index).map(|s| s.as_str())
}
pub fn is_valid_next_hash(&self, hash: &str) -> bool {
let expected = Self::calculate_next_hash(self.latest_hash());
hash == expected
}
pub fn add_next_hash(&mut self) -> String {
let next_hash = Self::calculate_next_hash(self.latest_hash());
self.chain.push(next_hash.clone());
self.hash_set.insert(next_hash.clone());
let msg = SharedMessage::new(
MessageType::Custom("chain_update".to_string()),
serde_json::json!(next_hash),
);
self.messages.push(msg);
next_hash
}
pub fn try_add_hash(&mut self, hash: &str) -> bool {
if self.hash_set.contains(hash) {
return false;
}
for i in 0..self.chain.len() {
let expected_next = Self::calculate_next_hash(&self.chain[i]);
if hash == expected_next {
self.chain.push(hash.to_string());
self.hash_set.insert(hash.to_string());
let msg = SharedMessage::new(
MessageType::Custom("chain_update".to_string()),
serde_json::json!(hash),
);
self.messages.push(msg);
return true;
}
}
false
}
pub fn chain(&self) -> &[String] {
&self.chain
}
pub fn find_hash_index(&self, hash: &str) -> Option<usize> {
self.chain.iter().position(|h| h == hash)
}
}
impl Default for MerkelizedChain {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ApplicationObject for MerkelizedChain {
fn id(&self) -> &SharedObjectId {
&self.id
}
fn type_name(&self) -> &'static str {
"MerkelizedChain"
}
async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
let Some(hash) = message.data.as_str() else {
return Ok(false);
};
if self.hash_set.contains(hash) {
return Ok(true);
}
for i in 0..self.chain.len() {
let expected_next = Self::calculate_next_hash(&self.chain[i]);
if hash == expected_next {
return Ok(true);
}
}
Ok(false)
}
async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
let Some(hash) = message.data.as_str() else {
return Ok(());
};
if self.hash_set.contains(hash) {
return Ok(());
}
if self.try_add_hash(hash) {
tracing::info!(
"MerkelizedChain: Added hash {} to chain (length: {})",
&hash[..8.min(hash.len())],
self.chain.len()
);
}
Ok(())
}
fn is_merkleized(&self) -> bool {
true
}
async fn get_latest_digest(&self) -> Result<String> {
Ok(self.latest_hash().to_string())
}
async fn has_digest(&self, digest: &str) -> Result<bool> {
Ok(self.hash_set.contains(digest))
}
async fn is_valid_digest(&self, digest: &str) -> Result<bool> {
Ok(self.hash_set.contains(digest) || self.is_valid_next_hash(digest))
}
async fn add_digest(&mut self, digest: String) -> Result<bool> {
if self.try_add_hash(&digest) {
Ok(true)
} else {
Ok(false)
}
}
async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>> {
let start_index = match digest {
Some(hash) => {
match self.find_hash_index(hash) {
Some(idx) => idx + 1, None => return Ok(Vec::new()), }
},
None => 1, };
let mut result = Vec::new();
for i in start_index..self.chain.len() {
let msg = SharedMessage::new(
MessageType::Custom("chain_update".to_string()),
serde_json::json!(self.chain[i]),
);
result.push(msg);
}
Ok(result)
}
async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>> {
self.gossip_messages(Some(digest)).await
}
async fn get_state(&self) -> Result<Value> {
Ok(serde_json::json!({
"chain_length": self.chain.len(),
"latest_hash": self.latest_hash(),
"genesis_hash": self.genesis_hash(),
}))
}
async fn reset(&mut self) -> Result<()> {
let genesis = Self::calculate_hash("genesis");
self.chain = vec![genesis.clone()];
self.hash_set = {
let mut set = HashSet::new();
set.insert(genesis);
set
};
self.messages = vec![SharedMessage::new(
MessageType::Custom("genesis".to_string()),
serde_json::json!("genesis"),
)];
Ok(())
}
fn clone_box(&self) -> Box<dyn ApplicationObject> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
#[derive(Debug, Clone)]
pub struct MessageChain {
id: SharedObjectId,
messages: Vec<SharedMessage>,
seen_hashes: HashSet<String>,
digests: Vec<String>,
}
impl MessageChain {
pub fn new() -> Self {
Self {
id: SharedObjectId::new(),
messages: Vec::new(),
seen_hashes: HashSet::new(),
digests: Vec::new(),
}
}
pub fn len(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
pub fn messages(&self) -> &[SharedMessage] {
&self.messages
}
fn msg_hash(msg: &SharedMessage) -> String {
msg.hash.clone()
}
}
impl Default for MessageChain {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ApplicationObject for MessageChain {
fn id(&self) -> &SharedObjectId {
&self.id
}
fn type_name(&self) -> &'static str {
"MessageChain"
}
async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
Ok(!message.data.is_null())
}
async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
let h = Self::msg_hash(&message);
if self.seen_hashes.contains(&h) {
return Ok(());
}
self.seen_hashes.insert(h);
self.messages.push(message);
Ok(())
}
fn is_merkleized(&self) -> bool {
true
}
async fn get_latest_digest(&self) -> Result<String> {
Ok(self
.messages
.last()
.map(|m| m.hash.clone())
.unwrap_or_else(|| "genesis".to_string()))
}
async fn has_digest(&self, digest: &str) -> Result<bool> {
Ok(self.digests.contains(&digest.to_string())
|| self.messages.iter().any(|m| m.hash == digest))
}
async fn is_valid_digest(&self, digest: &str) -> Result<bool> {
Ok(self.has_digest(digest).await?
|| digest == "genesis"
|| self.seen_hashes.contains(digest))
}
async fn add_digest(&mut self, _digest: String) -> Result<bool> {
Ok(false)
}
async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>> {
let start = match digest {
Some(d) if d != "genesis" => self
.messages
.iter()
.position(|m| m.hash == d)
.map(|i| i + 1)
.unwrap_or(0),
_ => 0,
};
Ok(self.messages[start..].to_vec())
}
async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>> {
self.gossip_messages(Some(digest)).await
}
async fn get_state(&self) -> Result<Value> {
Ok(serde_json::json!({
"length": self.messages.len(),
"message_count": self.messages.len()
}))
}
async fn reset(&mut self) -> Result<()> {
self.messages.clear();
self.seen_hashes.clear();
self.digests.clear();
Ok(())
}
fn clone_box(&self) -> Box<dyn ApplicationObject> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
#[derive(Debug)]
pub struct ApplicationObjectRegistry {
pub objects: HashMap<SharedObjectId, Box<dyn ApplicationObject>>,
objects_by_type: HashMap<String, Vec<SharedObjectId>>,
}
impl ApplicationObjectRegistry {
pub fn new() -> Self {
Self {
objects: HashMap::new(),
objects_by_type: HashMap::new(),
}
}
pub fn register(&mut self, object: Box<dyn ApplicationObject>) -> SharedObjectId {
let id = object.id().clone();
let type_name = object.type_name().to_string();
self.objects_by_type
.entry(type_name)
.or_default()
.push(id.clone());
self.objects.insert(id.clone(), object);
id
}
pub fn get(&self, id: &SharedObjectId) -> Option<&dyn ApplicationObject> {
self.objects.get(id).map(|obj| obj.as_ref())
}
pub fn get_by_type(&self, type_name: &str) -> Vec<Box<dyn ApplicationObject>> {
self.objects_by_type
.get(type_name)
.map(|ids| {
ids.iter()
.filter_map(|id| self.objects.get(id))
.map(|obj| obj.clone_box())
.collect()
})
.unwrap_or_default()
}
pub fn remove(&mut self, id: &SharedObjectId) -> Option<Box<dyn ApplicationObject>> {
if let Some(object) = self.objects.remove(id) {
let type_name = object.type_name().to_string();
if let Some(type_list) = self.objects_by_type.get_mut(&type_name) {
type_list.retain(|obj_id| obj_id != id);
if type_list.is_empty() {
self.objects_by_type.remove(&type_name);
}
}
Some(object)
} else {
None
}
}
pub fn ids(&self) -> Vec<SharedObjectId> {
self.objects.keys().cloned().collect()
}
pub fn len(&self) -> usize {
self.objects.len()
}
pub fn is_empty(&self) -> bool {
self.objects.is_empty()
}
pub fn clear(&mut self) {
self.objects.clear();
self.objects_by_type.clear();
}
pub async fn process_message(&mut self, message: SharedMessage) -> Result<Vec<SharedObjectId>> {
let mut processed_objects = Vec::new();
let ids: Vec<SharedObjectId> = self.objects.keys().cloned().collect();
for id in ids {
let is_valid = if let Some(object) = self.objects.get(&id) {
object.is_valid(&message).await?
} else {
false
};
if is_valid {
if let Some(object) = self.objects.get_mut(&id) {
object.add_message(message.clone()).await?;
processed_objects.push(id);
}
}
}
Ok(processed_objects)
}
}
impl Default for ApplicationObjectRegistry {
fn default() -> Self {
Self::new()
}
}