1use std::collections::HashMap;
2
3use flarch::{
4 broker::SubsystemHandler,
5 data_storage::DataStorage,
6 nodeids::{NodeID, U256},
7 platform_async_trait,
8};
9use flcrypto::tofrombytes::ToFromBytes;
10use serde::{Deserialize, Serialize};
11use tokio::sync::watch;
12
13use crate::{
14 dht_router::broker::{DHTRouterIn, DHTRouterOut},
15 flo::{
16 flo::{Flo, FloID},
17 realm::{FloRealm, GlobalID, RealmID},
18 },
19 router::messages::NetworkWrapper,
20};
21
22use super::{
23 broker::{DHTStorageIn, DHTStorageOut, MODULE_NAME},
24 core::*,
25};
26
27#[derive(Debug, Clone, PartialEq)]
29pub(super) enum InternIn {
30 Routing(DHTRouterOut),
31 Storage(DHTStorageIn),
32}
33
34#[derive(Debug, Clone, PartialEq)]
35pub(super) enum InternOut {
36 Routing(DHTRouterIn),
37 Storage(DHTStorageOut),
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub enum MessageClosest {
44 StoreFlo(Flo),
47 ReadFlo(RealmID),
49 GetCuckooIDs(RealmID),
51 StoreCuckooID(GlobalID),
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
60pub enum MessageDest {
61 FloValue(FloCuckoo),
63 UnknownFlo(GlobalID),
65 CookooIDs(GlobalID, Vec<FloID>),
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
70pub enum MessageNeighbour {
71 RequestRealmIDs,
72 AvailableRealmIDs(Vec<RealmID>),
73 RequestFloMetas(RealmID),
74 AvailableFlos(RealmID, Vec<FloMeta>),
75 RequestFlos(RealmID, Vec<FloID>),
76 Flos(Vec<FloCuckoo>),
77}
78
79#[derive(Debug, Clone)]
80pub struct RealmStats {
81 pub real_size: usize,
82 pub size: u64,
83 pub flos: usize,
84 pub distribution: Vec<usize>,
85 pub config: RealmConfig,
86}
87
88#[derive(Debug, Default, Clone)]
89pub struct Stats {
90 pub realm_stats: HashMap<RealmID, RealmStats>,
91 pub system_realms: Vec<RealmID>,
92}
93
94pub struct Messages {
96 realms: HashMap<RealmID, RealmStorage>,
97 nodes: Vec<NodeID>,
98 config: DHTConfig,
99 ds: Box<dyn DataStorage + Send>,
100 tx: Option<watch::Sender<Stats>>,
101}
102
103impl Messages {
104 pub fn new(
106 ds: Box<dyn DataStorage + Send>,
107 config: DHTConfig,
108 ) -> (Self, watch::Receiver<Stats>) {
109 let str = ds.get(MODULE_NAME).unwrap_or("".into());
110 let realms = serde_yaml::from_str(&str).unwrap_or(HashMap::new());
111 let (tx, rx) = watch::channel(Stats::default());
112 let mut msgs = Self {
113 realms,
114 config,
115 nodes: vec![],
116 ds,
117 tx: Some(tx),
118 };
119 msgs.store();
120 (msgs, rx)
121 }
122
123 fn msg_dht_router(&mut self, msg: DHTRouterOut) -> Vec<InternOut> {
124 match msg {
125 DHTRouterOut::MessageRouting(origin, _last_hop, _next_hop, key, msg) => msg
126 .unwrap_yaml(MODULE_NAME)
127 .map(|mn| self.msg_routing(false, origin, key, mn)),
128 DHTRouterOut::MessageClosest(origin, _last_hop, key, msg) => msg
129 .unwrap_yaml(MODULE_NAME)
130 .map(|mn| self.msg_routing(true, origin, key, mn)),
131 DHTRouterOut::MessageDest(_origin, _last_hop, msg) => {
132 msg.unwrap_yaml(MODULE_NAME).map(|mn| self.msg_dest(mn))
133 }
134 DHTRouterOut::NodeList(nodes) => {
135 self.nodes = nodes;
136 None
137 }
138 DHTRouterOut::MessageNeighbour(origin, msg) => msg
139 .unwrap_yaml(MODULE_NAME)
140 .map(|mn| self.msg_neighbour(origin, mn)),
141 DHTRouterOut::SystemRealm(realm_id) => {
142 if !self.config.realms.contains(&realm_id) {
143 self.config.realms.push(realm_id);
144 }
145 None
146 }
147 }
148 .unwrap_or(vec![])
149 }
150
151 fn msg_dht_storage(&mut self, msg: DHTStorageIn) -> Vec<InternOut> {
152 match msg {
154 DHTStorageIn::StoreFlo(flo) => self.store_flo(flo),
155 DHTStorageIn::ReadFlo(id) => vec![match self.read_flo(&id) {
156 Some(df) => DHTStorageOut::FloValue(df.clone()).into(),
157 None => MessageClosest::ReadFlo(id.realm_id().clone())
158 .to_intern_out(id.flo_id().clone().into())
159 .expect("Creating ReadFlo message"),
161 }],
162 DHTStorageIn::ReadCuckooIDs(id) => {
163 let mut out: Vec<InternOut> = self
164 .realms
165 .get(id.realm_id())
166 .and_then(|realm| realm.get_cuckoo_ids(id.flo_id()))
167 .map(|cids| vec![DHTStorageOut::CuckooIDs(id.clone(), cids).into()])
168 .unwrap_or_default();
169 out.push(
170 MessageClosest::GetCuckooIDs(id.realm_id().clone())
171 .to_intern_out(id.flo_id().clone().into())
172 .expect("Creating GetCuckoos"),
173 );
174 out
175 }
176 DHTStorageIn::SyncFromNeighbors => vec![MessageNeighbour::RequestRealmIDs
177 .to_broadcast()
178 .expect("Creating Request broadcast")],
179 DHTStorageIn::GetRealms => {
180 vec![DHTStorageOut::RealmIDs(self.realms.keys().cloned().collect()).into()]
181 }
182 DHTStorageIn::GetFlos => {
183 vec![DHTStorageOut::FloValues(
184 self.realms
185 .iter()
186 .flat_map(|realm| realm.1.get_all_flo_cuckoos())
187 .collect::<Vec<_>>(),
188 )
189 .into()]
190 }
191 DHTStorageIn::PropagateFlos => {
192 MessageNeighbour::AvailableRealmIDs(self.realms.keys().cloned().collect())
193 .to_broadcast()
194 .map_or(vec![], |msg| vec![msg])
195 }
196 }
197 }
198
199 fn msg_routing(
200 &mut self,
201 _closest: bool,
202 origin: NodeID,
203 key: U256,
204 msg: MessageClosest,
205 ) -> Vec<InternOut> {
206 let fid: FloID = key.into();
207 match msg {
208 MessageClosest::StoreFlo(flo) => {
209 return self.store_flo(flo);
210 }
211 MessageClosest::ReadFlo(rid) => {
212 if let Some(fc) = self
221 .realms
222 .get(&rid)
223 .and_then(|realm| realm.get_flo_cuckoo(&fid))
224 {
225 return MessageDest::FloValue(fc)
227 .to_intern_out(origin)
228 .map_or(vec![], |msg| vec![msg]);
230 }
231 }
232 MessageClosest::GetCuckooIDs(rid) => {
233 let parent = GlobalID::new(rid.clone(), fid.clone());
234 return self
235 .realms
236 .get(&rid)
237 .and_then(|realm| realm.get_cuckoo_ids(&fid))
238 .map_or(vec![], |ids| {
239 MessageDest::CookooIDs(parent, ids)
240 .to_intern_out(origin)
241 .map_or(vec![], |msg| vec![msg])
242 });
243 }
244 MessageClosest::StoreCuckooID(gid) => {
245 self.realms
246 .get_mut(&gid.realm_id())
247 .map(|realm| realm.store_cuckoo_id(&fid, gid.flo_id().clone()));
248 }
249 }
250 vec![]
251 }
252
253 fn msg_dest(&mut self, msg: MessageDest) -> Vec<InternOut> {
254 match msg {
255 MessageDest::FloValue(fc) => {
256 self.store_flo(fc.0.clone());
258 self.realms
259 .get_mut(&fc.0.realm_id())
260 .map(|realm| realm.store_cuckoo_ids(&fc.0.flo_id(), fc.1.clone()));
261 Some(DHTStorageOut::FloValue(fc).into())
262 }
263 MessageDest::UnknownFlo(key) => Some(DHTStorageOut::ValueMissing(key).into()),
264 MessageDest::CookooIDs(gid, cids) => Some(DHTStorageOut::CuckooIDs(gid, cids).into()),
265 }
266 .map_or(vec![], |msg| vec![msg])
267 }
268
269 fn msg_neighbour(&mut self, origin: NodeID, msg: MessageNeighbour) -> Vec<InternOut> {
270 match msg {
289 MessageNeighbour::RequestRealmIDs => vec![MessageNeighbour::AvailableRealmIDs(
290 self.realms.keys().cloned().collect(),
291 )],
292 MessageNeighbour::AvailableRealmIDs(realm_ids) => {
293 let accepted_realms = realm_ids
294 .into_iter()
295 .filter(|rid| self.config.accepts_realm(&rid))
296 .collect::<Vec<_>>();
297 accepted_realms
298 .iter()
299 .filter(|id| !self.realms.contains_key(id))
300 .map(|rid| MessageNeighbour::RequestFlos(rid.clone(), vec![(**rid).into()]))
301 .chain(
302 accepted_realms
303 .iter()
304 .map(|rid| MessageNeighbour::RequestFloMetas(rid.clone())),
305 )
306 .collect()
307 }
308 MessageNeighbour::RequestFloMetas(realm_id) => self
309 .realms
310 .get(&realm_id)
311 .map(|realm| realm.get_flo_metas())
312 .map_or(vec![], |fm| {
313 vec![MessageNeighbour::AvailableFlos(realm_id, fm)]
314 }),
315 MessageNeighbour::AvailableFlos(realm_id, flo_metas) => self
316 .realms
317 .get(&realm_id)
318 .and_then(|realm| realm.sync_available(&flo_metas))
319 .map_or(vec![], |needed| {
320 vec![MessageNeighbour::RequestFlos(realm_id, needed)]
321 }),
322 MessageNeighbour::RequestFlos(realm_id, flo_ids) => self
323 .realms
324 .get(&realm_id)
325 .map(|realm| {
326 flo_ids
327 .iter()
328 .filter_map(|id| realm.get_flo_cuckoo(id))
329 .collect::<Vec<_>>()
330 })
331 .map_or(vec![], |flos| vec![MessageNeighbour::Flos(flos)]),
332 MessageNeighbour::Flos(flo_cuckoos) => {
333 for (flo, cuckoos) in flo_cuckoos {
334 self.store_flo(flo.clone());
335 self.realms.get_mut(&flo.realm_id()).map(|realm| {
336 cuckoos
337 .into_iter()
338 .for_each(|cuckoo| realm.store_cuckoo_id(&flo.flo_id(), cuckoo))
339 });
340 }
341 vec![]
342 }
343 }
344 .into_iter()
345 .filter_map(|msg| msg.to_neighbour(origin))
346 .collect()
347 }
348
349 fn read_flo(&self, id: &GlobalID) -> Option<FloCuckoo> {
350 self.realms
351 .get(id.realm_id())
352 .and_then(|realm| realm.get_flo_cuckoo(id.flo_id()))
353 }
354
355 fn store_flo(&mut self, flo: Flo) -> Vec<InternOut> {
356 let mut res = vec![];
357 if self.upsert_flo(flo.clone()) {
358 res.extend(vec![MessageClosest::StoreFlo(flo.clone())
361 .to_intern_out(flo.flo_id().into())
362 .expect("Storing new DHT")]);
363 }
364 if let Some(parent) = flo.flo_config().cuckoo_parent() {
365 res.extend(vec![MessageClosest::StoreCuckooID(flo.global_id())
366 .to_intern_out(*parent.clone())
367 .expect("Storing new Cuckoo")])
368 }
369 res
370 }
371
372 fn upsert_flo(&mut self, flo: Flo) -> bool {
376 let modification = self
396 .realms
397 .get_mut(&flo.realm_id())
398 .map(|dsc| dsc.upsert_flo(flo.clone()))
400 .unwrap_or_else(|| {
401 TryInto::<FloRealm>::try_into(flo)
402 .ok()
403 .and_then(|realm| self.create_realm(realm).ok())
404 .is_some()
405 });
406
407 if modification {
408 self.store();
409 }
410 modification
411 }
412
413 fn create_realm(&mut self, realm: FloRealm) -> anyhow::Result<()> {
414 if !self.config.accepts_realm(&realm.realm_id()) {
415 return Err(CoreError::RealmNotAccepted.into());
416 }
417 let id = realm.flo().realm_id();
424 let dsc = RealmStorage::new(self.config.clone(), realm)?;
425 self.realms.insert(id.clone(), dsc);
426 Ok(())
428 }
429
430 fn store(&mut self) {
431 self.tx.clone().map(|tx| {
432 tx.send(Stats::from_realms(&self.realms, self.config.realms.clone()))
433 .is_err()
434 .then(|| self.tx = None)
435 });
436 serde_yaml::to_string(&self.realms)
437 .ok()
438 .map(|s| (*self.ds).set(MODULE_NAME, &s));
439 }
440}
441
442#[platform_async_trait()]
443impl SubsystemHandler<InternIn, InternOut> for Messages {
444 async fn messages(&mut self, inputs: Vec<InternIn>) -> Vec<InternOut> {
445 let _id = self.config.our_id;
446 inputs
447 .into_iter()
448 .inspect(|msg| log::trace!("DHTStorageIn: {msg:?}"))
449 .flat_map(|msg| match msg {
450 InternIn::Routing(dhtrouting_out) => self.msg_dht_router(dhtrouting_out),
451 InternIn::Storage(dhtstorage_in) => self.msg_dht_storage(dhtstorage_in),
452 })
453 .inspect(|msg| log::trace!("DHTStorageOut: {msg:?}"))
454 .collect()
455 }
456}
457
458impl Stats {
459 fn from_realms(realms: &HashMap<RealmID, RealmStorage>, system_realms: Vec<RealmID>) -> Self {
460 Self {
461 realm_stats: realms
462 .iter()
463 .map(|(id, realm)| (id.clone(), RealmStats::from_realm(realm)))
464 .collect(),
465 system_realms,
466 }
467 }
468}
469
470impl RealmStats {
471 fn from_realm(realm: &RealmStorage) -> Self {
472 Self {
473 real_size: realm.size(),
474 size: realm.size,
475 flos: realm.flo_count(),
476 distribution: realm.flo_distribution(),
477 config: realm.realm_config().clone(),
478 }
479 }
480}
481
482impl MessageClosest {
483 fn to_intern_out(&self, dst: NodeID) -> Option<InternOut> {
484 NetworkWrapper::wrap_yaml(MODULE_NAME, self)
485 .ok()
486 .map(|msg_wrap| InternOut::Routing(DHTRouterIn::MessageClosest(dst, msg_wrap)))
487 }
488}
489
490impl MessageDest {
491 fn to_intern_out(&self, dst: NodeID) -> Option<InternOut> {
492 NetworkWrapper::wrap_yaml(MODULE_NAME, self)
493 .ok()
494 .map(|msg_wrap| InternOut::Routing(DHTRouterIn::MessageDirect(dst, msg_wrap)))
495 }
496}
497
498impl From<DHTStorageOut> for InternOut {
499 fn from(value: DHTStorageOut) -> Self {
500 InternOut::Storage(value)
501 }
502}
503
504impl MessageNeighbour {
505 fn to_neighbour(self, dst: NodeID) -> Option<InternOut> {
506 (match &self {
507 MessageNeighbour::AvailableRealmIDs(realm_ids) => realm_ids.len(),
508 MessageNeighbour::AvailableFlos(_, flo_metas) => flo_metas.len(),
509 MessageNeighbour::RequestFlos(_, flo_ids) => flo_ids.len(),
510 MessageNeighbour::Flos(items) => items.len(),
511 _ => 1,
512 } > 0)
513 .then(|| {
514 NetworkWrapper::wrap_yaml(MODULE_NAME, &self)
515 .ok()
516 .map(|msg_wrap| {
517 InternOut::Routing(DHTRouterIn::MessageNeighbour(dst, msg_wrap))
518 })
519 })
520 .flatten()
521 }
522
523 fn to_broadcast(self) -> Option<InternOut> {
524 (match &self {
525 MessageNeighbour::AvailableRealmIDs(realm_ids) => realm_ids.len(),
526 MessageNeighbour::AvailableFlos(_, flo_metas) => flo_metas.len(),
527 MessageNeighbour::RequestFlos(_, flo_ids) => flo_ids.len(),
528 MessageNeighbour::Flos(items) => items.len(),
529 _ => 1,
530 } > 0)
531 .then(|| {
532 NetworkWrapper::wrap_yaml(MODULE_NAME, &self)
533 .ok()
534 .map(|msg_wrap| InternOut::Routing(DHTRouterIn::MessageBroadcast(msg_wrap)))
535 })
536 .flatten()
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use flarch::{data_storage::DataStorageTemp, start_logging};
543 use flcrypto::{access::Condition, signer_ed25519::SignerEd25519};
544
545 use crate::{
546 flo::realm::Realm,
547 testing::wallet::{FloTesting, Wallet},
548 };
549
550 use super::*;
551
552 #[test]
553 fn test_choice() -> anyhow::Result<()> {
554 let our_id = NodeID::rnd();
555 let mut dht = Messages::new(Box::new(DataStorageTemp::new()), DHTConfig::default(our_id)).0;
556
557 let mut wallet = Wallet::new();
558 let realm = wallet.get_realm();
559 dht.msg_dht_storage(DHTStorageIn::StoreFlo(realm.flo().clone()));
560 let out = dht.msg_dht_storage(DHTStorageIn::ReadFlo(realm.global_id()));
561 assert_eq!(1, out.len());
562 assert_eq!(
563 *out.get(0).unwrap(),
564 InternOut::Storage(DHTStorageOut::FloValue((realm.flo().clone(), vec![])))
565 );
566 Ok(())
567 }
568
569 #[test]
570 fn serialize() -> anyhow::Result<()> {
571 let fr = FloRealm::from_type(
572 RealmID::rnd(),
573 Condition::Pass,
574 Realm::new(
575 "root".into(),
576 RealmConfig {
577 max_space: 1000,
578 max_flo_size: 1000,
579 },
580 ),
581 &[],
582 )?;
583
584 let out = NetworkWrapper::wrap_yaml(
585 MODULE_NAME,
586 &MessageDest::FloValue((fr.flo().clone(), vec![])),
587 )
588 .unwrap();
589
590 if let MessageDest::FloValue(flo) = out.unwrap_yaml(MODULE_NAME).unwrap() {
591 let fr2 = TryInto::<FloRealm>::try_into(flo.0)?;
592 assert_eq!(fr, fr2);
593 } else {
594 return Err(anyhow::anyhow!("Didn't find message"));
595 }
596 Ok(())
597 }
598
599 #[test]
601 fn store_cuckoos() -> anyhow::Result<()> {
602 start_logging();
603 let mut msg = Messages::new(
604 DataStorageTemp::new_box(),
605 DHTConfig::default(NodeID::rnd()),
606 )
607 .0;
608 let fr = FloRealm::from_type(
609 RealmID::rnd(),
610 Condition::Pass,
611 Realm::new(
612 "root".into(),
613 RealmConfig {
614 max_space: 10000,
615 max_flo_size: 1000,
616 },
617 ),
618 &[],
619 )?;
620 msg.msg_dht_storage(DHTStorageIn::StoreFlo(fr.clone().into()));
621
622 let signer = SignerEd25519::new();
623 let fb_root =
624 FloTesting::new_cuckoo(fr.realm_id(), "data", Cuckoo::Duration(100000), &signer);
625 msg.msg_dht_storage(DHTStorageIn::StoreFlo(fb_root.clone().into()));
626 let fb_cu = FloTesting::new_cuckoo(
627 fr.realm_id(),
628 "data2",
629 Cuckoo::Parent(fb_root.flo_id()),
630 &signer,
631 );
632 let ans = msg.msg_dht_storage(DHTStorageIn::StoreFlo(fb_cu.clone().into()));
633 log::info!("{ans:?}");
634
635 let ans = msg.msg_dht_storage(DHTStorageIn::ReadCuckooIDs(fb_root.global_id()));
636 if let Some(InternOut::Storage(DHTStorageOut::CuckooIDs(_, cs))) = ans.get(0) {
637 assert_eq!(Some(&fb_cu.flo_id()), cs.get(0));
638 } else {
639 assert!(false, "Answer should be CuckooIDs");
640 }
641
642 Ok(())
643 }
644}