chaincraft_rust/
shared_object.rs1pub use crate::shared::SharedObjectId;
4use crate::{
5 error::{ChaincraftError, Result},
6 shared::{MessageType, SharedMessage, SharedObject},
7};
8use async_trait::async_trait;
9use chrono;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use sha2::{Digest, Sha256};
13use std::any::Any;
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18#[async_trait]
20pub trait ApplicationObject: Send + Sync + std::fmt::Debug {
21 fn id(&self) -> &SharedObjectId;
23
24 fn type_name(&self) -> &'static str;
26
27 async fn is_valid(&self, message: &SharedMessage) -> Result<bool>;
29
30 async fn add_message(&mut self, message: SharedMessage) -> Result<()>;
32
33 fn is_merkleized(&self) -> bool;
35
36 async fn get_latest_digest(&self) -> Result<String>;
38
39 async fn has_digest(&self, digest: &str) -> Result<bool>;
41
42 async fn is_valid_digest(&self, digest: &str) -> Result<bool>;
44
45 async fn add_digest(&mut self, digest: String) -> Result<bool>;
47
48 async fn gossip_messages(&self, digest: Option<&str>) -> Result<Vec<SharedMessage>>;
50
51 async fn get_messages_since_digest(&self, digest: &str) -> Result<Vec<SharedMessage>>;
53
54 async fn get_state(&self) -> Result<Value>;
56
57 async fn reset(&mut self) -> Result<()>;
59
60 fn clone_box(&self) -> Box<dyn ApplicationObject>;
62
63 fn as_any(&self) -> &dyn Any;
65
66 fn as_any_mut(&mut self) -> &mut dyn Any;
68}
69
70#[derive(Debug, Clone)]
72pub struct SimpleSharedNumber {
73 id: SharedObjectId,
74 number: i64,
75 created_at: chrono::DateTime<chrono::Utc>,
76 updated_at: chrono::DateTime<chrono::Utc>,
77 locked: bool,
78 messages: Vec<SharedMessage>,
79 seen_hashes: HashSet<String>,
80 digests: Vec<String>,
81}
82
83impl SimpleSharedNumber {
84 pub fn new() -> Self {
85 Self {
86 id: SharedObjectId::new(),
87 number: 0,
88 created_at: chrono::Utc::now(),
89 updated_at: chrono::Utc::now(),
90 locked: false,
91 messages: Vec::new(),
92 seen_hashes: HashSet::new(),
93 digests: Vec::new(),
94 }
95 }
96
97 pub fn get_number(&self) -> i64 {
98 self.number
99 }
100
101 pub fn get_messages(&self) -> &[SharedMessage] {
102 &self.messages
103 }
104
105 fn calculate_message_hash(data: &Value) -> String {
106 let data_str = serde_json::to_string(&serde_json::json!({
107 "content": data
108 }))
109 .unwrap_or_default();
110 let mut hasher = Sha256::new();
111 hasher.update(data_str.as_bytes());
112 hex::encode(hasher.finalize())
113 }
114}
115
116impl Default for SimpleSharedNumber {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122#[async_trait]
123impl ApplicationObject for SimpleSharedNumber {
124 fn id(&self) -> &SharedObjectId {
125 &self.id
126 }
127
128 fn type_name(&self) -> &'static str {
129 "SimpleSharedNumber"
130 }
131
132 async fn is_valid(&self, message: &SharedMessage) -> Result<bool> {
133 Ok(message.data.is_i64())
135 }
136
137 async fn add_message(&mut self, message: SharedMessage) -> Result<()> {
138 let msg_hash = Self::calculate_message_hash(&message.data);
140
141 if self.seen_hashes.contains(&msg_hash) {
142 return Ok(());
144 }
145
146 self.seen_hashes.insert(msg_hash);
147
148 if let Some(value) = message.data.as_i64() {
150 self.number += value;
151 self.messages.push(message);
152 tracing::info!("SimpleSharedNumber: Added message with data: {}", value);
153 }
154
155 Ok(())
156 }
157
158 fn is_merkleized(&self) -> bool {
159 false
160 }
161
162 async fn get_latest_digest(&self) -> Result<String> {
163 Ok(self.number.to_string())
164 }
165
166 async fn has_digest(&self, digest: &str) -> Result<bool> {
167 Ok(self.digests.contains(&digest.to_string()))
168 }
169
170 async fn is_valid_digest(&self, _digest: &str) -> Result<bool> {
171 Ok(true)
172 }
173
174 async fn add_digest(&mut self, digest: String) -> Result<bool> {
175 self.digests.push(digest);
176 Ok(true)
177 }
178
179 async fn gossip_messages(&self, _digest: Option<&str>) -> Result<Vec<SharedMessage>> {
180 Ok(Vec::new())
181 }
182
183 async fn get_messages_since_digest(&self, _digest: &str) -> Result<Vec<SharedMessage>> {
184 Ok(Vec::new())
185 }
186
187 async fn get_state(&self) -> Result<Value> {
188 Ok(serde_json::json!({
189 "number": self.number,
190 "message_count": self.messages.len(),
191 "seen_hashes_count": self.seen_hashes.len()
192 }))
193 }
194
195 async fn reset(&mut self) -> Result<()> {
196 self.number = 0;
197 self.messages.clear();
198 self.seen_hashes.clear();
199 self.digests.clear();
200 Ok(())
201 }
202
203 fn clone_box(&self) -> Box<dyn ApplicationObject> {
204 Box::new(self.clone())
205 }
206
207 fn as_any(&self) -> &dyn Any {
208 self
209 }
210
211 fn as_any_mut(&mut self) -> &mut dyn Any {
212 self
213 }
214}
215
216#[derive(Debug)]
218pub struct ApplicationObjectRegistry {
219 objects: HashMap<SharedObjectId, Box<dyn ApplicationObject>>,
220 objects_by_type: HashMap<String, Vec<SharedObjectId>>,
221}
222
223impl ApplicationObjectRegistry {
224 pub fn new() -> Self {
225 Self {
226 objects: HashMap::new(),
227 objects_by_type: HashMap::new(),
228 }
229 }
230
231 pub fn register(&mut self, object: Box<dyn ApplicationObject>) -> SharedObjectId {
233 let id = object.id().clone();
234 let type_name = object.type_name().to_string();
235
236 self.objects_by_type
237 .entry(type_name)
238 .or_default()
239 .push(id.clone());
240
241 self.objects.insert(id.clone(), object);
242 id
243 }
244
245 pub fn get(&self, id: &SharedObjectId) -> Option<&dyn ApplicationObject> {
247 self.objects.get(id).map(|obj| obj.as_ref())
248 }
249
250 pub fn get_by_type(&self, type_name: &str) -> Vec<Box<dyn ApplicationObject>> {
252 self.objects_by_type
253 .get(type_name)
254 .map(|ids| {
255 ids.iter()
256 .filter_map(|id| self.objects.get(id))
257 .map(|obj| obj.clone_box())
258 .collect()
259 })
260 .unwrap_or_default()
261 }
262
263 pub fn remove(&mut self, id: &SharedObjectId) -> Option<Box<dyn ApplicationObject>> {
265 if let Some(object) = self.objects.remove(id) {
266 let type_name = object.type_name().to_string();
267 if let Some(type_list) = self.objects_by_type.get_mut(&type_name) {
268 type_list.retain(|obj_id| obj_id != id);
269 if type_list.is_empty() {
270 self.objects_by_type.remove(&type_name);
271 }
272 }
273 Some(object)
274 } else {
275 None
276 }
277 }
278
279 pub fn ids(&self) -> Vec<SharedObjectId> {
281 self.objects.keys().cloned().collect()
282 }
283
284 pub fn len(&self) -> usize {
286 self.objects.len()
287 }
288
289 pub fn is_empty(&self) -> bool {
291 self.objects.is_empty()
292 }
293
294 pub fn clear(&mut self) {
296 self.objects.clear();
297 self.objects_by_type.clear();
298 }
299
300 pub async fn process_message(&mut self, message: SharedMessage) -> Result<Vec<SharedObjectId>> {
302 let mut processed_objects = Vec::new();
303
304 let ids: Vec<SharedObjectId> = self.objects.keys().cloned().collect();
306
307 for id in ids {
309 let is_valid = if let Some(object) = self.objects.get(&id) {
311 object.is_valid(&message).await?
312 } else {
313 false
314 };
315
316 if is_valid {
318 if let Some(object) = self.objects.get_mut(&id) {
319 object.add_message(message.clone()).await?;
320 processed_objects.push(id);
321 }
322 }
323 }
324
325 Ok(processed_objects)
326 }
327}
328
329impl Default for ApplicationObjectRegistry {
330 fn default() -> Self {
331 Self::new()
332 }
333}