1use std::time::Duration;
2use std::sync::atomic::Ordering;
3
4#[cfg(target_has_atomic = "64")]
5use std::sync::atomic::AtomicU64;
6
7#[cfg(not(target_has_atomic = "64"))]
8use portable_atomic::AtomicU64;
9
10use crypto_box::SecretKey;
11use dashmap::DashMap;
12use rusqlite::{
13 Error,
14 types::Type,
15 Result as SqlResult
16};
17use tracing::instrument;
18
19use crate::control::{
20 Conspirator as InlineConspirator,
21 CnsprcyStatus,
22 KeyArg,
23 NodeArg,
24 NodeReq
25};
26use crate::event::{
27 AnyEvent,
28 PeerEvent
29};
30use crate::message::{
31 self,
32 DynamicPayload,
33 Payload,
34 Packet,
35 PingOrPong,
36 ProtocolPayload
37};
38use crate::peer::{
39 Address,
40 AtomicClock,
41 data::TickResult,
42 Invitation,
43 induct::Content as InvitationContent,
44 NodeID as ID,
45 Peer,
46 PeerState,
47 Status
48};
49use crate::store::{
50 Store,
51 Action,
52 Config,
53 Conspirator,
54 Op
55};
56
57use crate::util::*;
58
59#[derive(Debug)]
60pub struct Node {
61 id: ID,
62 name: String,
63 clk: AtomicClock,
64 hash: AtomicU64,
65 outbound: Sender<PktTo>,
66 dispatcher_sender: Sender<AnyEvent>,
67 db: Store,
68 table: DashMap<ID, Peer>
69}
70
71pub enum ValidationResult {
72 Ok,
73 NewAddress(Address),
75 Disabled,
77 Invalid
78}
79
80impl Node {
81
82#[instrument(skip_all, level = "debug")]
87 pub fn load(
88 db: Store,
89 outbound: Sender<PktTo>,
90 dispatcher_sender: Sender<AnyEvent>)
91 -> SqlResult<Option<Self>>
92 {
93 let own_id = db.read_config(Config::ID)
94 .inspect_err(|err| error!(%err, "failed to load own id"))?
95 .ok_or(Error::QueryReturnedNoRows)?
96 .parse::<ID>()
97 .map_err(|err| Error::FromSqlConversionFailure(
98 1,
99 Type::Text,
100 format!("failed to load own node ID: {err}").into()
101 ))?;
102 debug!(%own_id, "loaded");
103 let mut clock: Option<AtomicClock> = None;
104 let mut name: Option<String> = None;
105 let table = DashMap::new();
106 for (conspirator, addr_count) in db.get_conspirators()? {
107 debug!(?conspirator, "loading");
108 if conspirator.id == own_id {
109 clock = Some(AtomicClock::load(conspirator.clock));
110 name = Some(conspirator.name);
111 if !conspirator.active {
112 warn!("node has been disabled!");
113 return Ok(None);
114 }
115 }
116 else if conspirator.active {
117 let mut peer = Peer::load(
118 conspirator.id,
119 conspirator.name,
120 conspirator.clock,
121 conspirator.state,
122 addr_count
123 );
124 peer.update_address_count(addr_count);
125 table.insert(conspirator.id, peer);
126 }
127 }
128 let hash = db.hash()?.into();
129 let Some((name, clk)) = name.zip(clock) else {
130 error!(%own_id, "corrupt db: missing own entry among conspirators");
131 return Err(Error::QueryReturnedNoRows);
132 };
133
134 Ok(Some(Self {
135 id: own_id,
136 name,
137 clk,
138 hash,
139 outbound,
140 dispatcher_sender,
141 db,
142 table
143 }))
144 }
145
146 pub fn init(id: ID, name: String, db: Store)
147 -> SqlResult<(Self, Receiver<AnyEvent>, Receiver<PktTo>)>
148 {
149 let (dispatcher_sender, dispatcher_receiver)
151 = new_channel::<AnyEvent>();
152
153 let (outbound, outbound_packet_receiver)
155 = new_channel::<PktTo>();
156
157 db.write_config(Config::ID, &id.to_string())?;
158 let name_op = db.create_op(id, id, Action::Name(name))?;
159 db.absorb_op(name_op)?;
160 Self::load(db, outbound, dispatcher_sender).map(|node_opt| (
161 node_opt.expect("newly created node should be enabled"),
162 dispatcher_receiver,
163 outbound_packet_receiver
164 ))
165 }
166
167 #[cfg(test)]
168 pub fn stop(self) -> Store {
169 self.db
170 }
171
172fn hash(&self) -> u64 {
179 self.hash.load(Ordering::Acquire)
180 }
181
182 fn update_hash(&self) {
183 match self.db.hash() {
184 Ok(hash) => self.hash.store(hash, Ordering::Release),
185 Err(err) => error!(%err, "failed to update hash")
186 }
187 }
188
189 fn vector_clock_for(&self, to: ID) -> Vec<PeerState> {
190 self.table
191 .iter()
192 .filter(|p| p.id != to)
193 .map(|p| p.get_peer_state())
194 .collect()
195 }
196
197 fn init_sync(&self) -> Payload {
198 match self.db.vector() {
199 Ok(counters) => ProtocolPayload::Sync {
200 counters,
201 ops: vec![] }.into(),
203 Err(err) => {
204 error!(%err, "failed to construct Sync payload");
205 Payload::None
206 }
207 }
208 }
209
210 #[instrument(skip_all, fields(peer = %new), level = "debug")]
213 fn add_new_peer(&self, mut new: Peer) -> bool {
214 if let Some(old) = self.table.get(&new.id) {
215 error!(old = ?old.value(), ?new, "attempted to clobber peer");
216 false
217 }
218 else if new.id == self.id {
219 error!(%self.id, %new.name, "tried to add peer with own id");
220 false
221 }
222 else if self.db.get::<Conspirator>(new.id)
223 .is_ok_and(|o| o.is_some_and(|c| !c.active))
224 {
225 warn!("tried to re-add disabled peer");
226 false
227 }
228 else {
229 if let Some(addr) = new.get_address() {
230 match self.db.new_address(new.id, addr) {
231 Ok(true) => new.new_addr_pending = true,
232 Ok(false) => {},
233 Err(err) => error!(
234 %err,
235 %addr,
236 "failed to determine whether address is new"
237 )
238 }
239 }
240 self.table.insert(new.id, new);
241 info!("added");
242 true
243 }
244 }
245
246 fn resolve_and_then<F, R>(&self, node_arg: NodeArg, f: F) -> Option<R>
247 where F: Fn(&mut Peer) -> R
248 {
249 match node_arg {
250 NodeArg::ID(id) => self.table
251 .get_mut(&id.into())
252 .map(|mut p| f(&mut p)),
253 NodeArg::Name(name) => self.table
254 .iter_mut()
255 .find(|p| p.name == name)
256 .map(|mut p| f(&mut p))
257 }
258 }
259
260 #[instrument(skip(self), level = "debug")]
261 fn add_address(&mut self, id: ID, addr: Address) -> SqlResult<Op> {
262 let op = self.db.create_op(self.id, id, Action::AddAddress(addr))?;
263 info!(?op, "created");
264 let _ = self.absorb([op.clone()].into_iter());
266
267 Ok(op)
268 }
269
270 #[must_use]
273 fn absorb(&mut self, ops: impl Iterator<Item = Op>) -> bool {
274 let mut unchanged = true;
275 for op in ops {
277 match self.db.absorb_op(op) {
278 Ok(true) => unchanged = false,
279 Ok(false) => {},
280 Err(err) => error!(%err, "failed to absorb op")
281 }
282 }
283 if unchanged {return true}
284
285 self.update_hash();
286
287 let conspirators = match self.db.get_conspirators() {
288 Ok(conspirators) => conspirators,
289 Err(err) => {
290 error!(%err, "failed to load conspirators");
291 return true
292 }
293 };
294 for (conspirator, addr_count) in conspirators {
295 if conspirator.id == self.id {
296 self.name = conspirator.name;
297 if !conspirator.active {return false}
298 }
299 else if !conspirator.active {
300 if let Some((_id, p)) = self.table.remove(&conspirator.id) {
301 info!(%p, "removed deactivated conspirator");
302 }
303 }
304 else {
305 let Some(mut p) = self.table.get_mut(&conspirator.id) else {
306 let peer = Peer::load(
307 conspirator.id,
308 conspirator.name,
309 conspirator.clock,
310 conspirator.state,
311 addr_count
312 );
313 self.table.insert(conspirator.id, peer);
314 continue;
315 };
316 p.update_address_count(addr_count);
318 p.name = conspirator.name;
319 p.new_addr_pending = p.get_pending_address()
320 .and_then(|address| self.db.new_address(p.id, address)
321 .map_err(|err| error!(
322 %err,
323 %address,
324 "can't tell if address is new, assuming no"
325 ))
326 .ok())
327 .inspect(|&pending| if !pending {
328 debug!(%p.id, "pending address cleared")
329 })
330 .unwrap_or(false);
331 }
332 }
333 true
334 }
335
336 #[instrument(skip(self), level = "debug")]
338 fn author_op(&mut self, target: ID, action: Action) -> bool {
339 match self.db.create_op(self.id, target, action) {
340 Ok(op) => {
341 let _ = self.absorb([op.clone()].into_iter());
343 self.broadcast(ProtocolPayload::Ops(vec![op]).into())
345 },
346 Err(err) => {
347 error!(%err, "failed to create op");
348 false
349 }
350 }
351 }
352
353 fn shutdown(&mut self) {
355 let (dispatcher_sender, _dispatcher_receiver)
357 = new_channel::<AnyEvent>();
358
359 let (outbound, _outbound_packet_receiver)
361 = new_channel::<PktTo>();
362
363 self.outbound = outbound;
364 self.dispatcher_sender = dispatcher_sender;
365 }
366
367fn send_packet(&self, pkt: PktTo) {
372 self.outbound
373 .send(pkt)
374 .unwrap_or_else(|pkt| error!(?pkt, "node failed to send packet"))
375 }
376
377 fn packet(&self, dst: ID, png: PingOrPong, pyl: Payload) -> Packet {
378 let clk = self.clk.next();
379 let hsh = self.hash();
380 let vec = self.vector_clock_for(dst);
381 Packet { dst, src: self.id, clk, hsh, vec, png, pyl }
382 }
383
384 fn ping(&self, to: ID, at: Address) {
385 let packet = self.packet(to, PingOrPong::Ping, Payload::None);
386 self.send_packet((at, packet));
387 }
388
389 fn send(&self, to: ID, pyl: Payload) {
390 let addr = self.table
391 .get_mut(&to)
392 .expect("send() to non-existent peer")
393 .get_address_to_ping()
394 .expect("send() to peer without address");
395 let packet = self.packet(to, PingOrPong::Ping, pyl);
396 self.send_packet((addr, packet));
397 }
398
399 fn respond(&self, to: ID, pyl: Payload) {
400 let addr = self.table
401 .get(&to)
402 .expect("respond to non-existent peer")
403 .get_address()
404 .expect("respond to peer without address");
405 let packet = self.packet(to, PingOrPong::Pong, pyl);
406 self.send_packet((addr, packet));
407 }
408
409 fn send_or_respond(&self, to: ID, png: PingOrPong, pyl: Payload) {
410 match png {
411 PingOrPong::Ping => self.send(to, pyl),
412 PingOrPong::Pong => self.respond(to, pyl)
413 }
414 }
415
416 fn broadcast(&self, pyl: Payload) -> bool {
417 let active_ids: Vec<ID> = self.table
418 .iter()
419 .filter(|p| p.is_active())
420 .map(|p| p.id)
421 .collect();
422 for active_id in active_ids {
423 self.send(active_id, pyl.clone())
424 }
425 let clocks = self.table.iter()
426 .map(|r| (*r.key(), r.value().get_peer_state().clk))
427 .chain([(self.id, self.clk.prev())]);
428 self.db.persist_clocks(clocks)
429 .map_err(|err| error!(%err, "failed to persist clocks"))
430 .is_ok()
431 }
432
433 fn dispatch_event<E: Into<AnyEvent>>(&self, event: E) {
434 self.dispatcher_sender
435 .send(event.into())
436 .unwrap_or_else(|evt| error!(?evt, "node failed to send event"))
437 }
438
439pub fn handle_request(&mut self, req: NodeReq) {
444 match req {
445 NodeReq::Write(((key, value), replier)) => {
446 replier.reply(
447 self.author_op(self.id, Action::Write {key, value})
448 );
449 },
450 NodeReq::AddAddress(((arg, addr), replier)) => {
451 let known_id = match &arg {
452 NodeArg::ID(id) => self.table.contains_key(&id.get())
453 .then_some(id.get()),
454 NodeArg::Name(n) => self.table.iter()
455 .find(|p| &p.name == n)
456 .map(|p| p.id)
457 };
458 let Some(id) = known_id else {
459 error!(%arg, "no such node");
460 replier.reply(false);
461 return;
462 };
463 match self.db.new_address(id, addr) {
464 Ok(true) => match self.add_address(id, addr) {
465 Ok(_op) => replier.reply(true),
466 Err(err) => {
467 error!(%addr, %err, "failed to add address");
468 replier.reply(false)
469 }
470 },
471 Ok(false) => replier.reply(true),
473 Err(err) => {
474 error!(%addr, %err, "failed to add address");
475 replier.reply(false)
476 }
477 }
478 },
479 NodeReq::Advertise(req) => {
480 let private_key = KeyArg::random();
483 let res = self.db.write_config(
485 Config::PRIVATE_KEY,
486 &private_key.to_string()
487 );
488 match res {
489 Ok(true) => {},
490 Ok(false) => warn!("unexpected number of changed rows"),
491 Err(err) => {
492 error!(?err, "failed to store new private key");
493 return;
495 }
496 }
497 let public_key = SecretKey::from_bytes(private_key.get())
499 .public_key()
500 .to_bytes();
501 req.reply(public_key);
502 },
503 NodeReq::Invite(((pubkey, addresses), replier)) => {
504 let key = match self.db.read_config(Config::KEY) {
505 Ok(Some(key_str)) => match key_str.parse() {
506 Ok(key) => key,
507 Err(err) => {
508 error!(?err, "failed to read cnsprcy key from db");
509 return;
510 }
511 },
512 Ok(None) => {
513 error!("cnsprcy key not in db");
514 return;
515 },
516 Err(err) => {
517 error!(?err, "failed to read cnsprcy key from db");
518 return;
519 }
520 };
521 let content = InvitationContent {
522 id: self.id,
523 name: self.name.clone(),
524 key,
525 addresses
526 };
527 let cb_public_key = pubkey.clone().get().into();
528 match Invitation::encrypt(&cb_public_key, &content) {
529 Ok(invitation) => replier.reply(invitation),
530 Err(err) => {
531 error!(%err, %pubkey, "failed to encrypt invitation");
532 }
533 };
534 },
535 NodeReq::Disable((arg, replier)) => {
536 let known_id = match &arg {
538 NodeArg::ID(id) if id.get() == self.id => Some(self.id),
539 NodeArg::Name(n) if n == &self.name => Some(self.id),
540 NodeArg::ID(id) => self.table.contains_key(&id.get())
541 .then_some(id.get()),
542 NodeArg::Name(n) => self.table.iter()
543 .find(|p| &p.name == n)
544 .map(|p| p.id)
545 };
546 let Some(id) = known_id else {
547 error!(%arg, "no such (enabled) node");
548 replier.reply(false);
549 return;
550 };
551 replier.reply(
552 self.author_op(id, Action::Active(false))
553 );
554 if id == self.id {
555 warn!("self-disabled successfully, shutting down");
556 self.shutdown();
557 }
558 },
559 NodeReq::Join((join_req, replier)) => replier.reply(
560 self.join(join_req.id.into(), join_req.name, join_req.addr)
561 ),
562 NodeReq::GetConspirator((arg, replier)) => {
563 if let Some(p) = self.resolve_and_then(arg, |p| p.clone()) {
564 replier.reply(p);
565 }
566 },
567 NodeReq::GetConspirators(req) => req.reply(
568 self.table
569 .iter()
570 .map(|p| p.value().clone().into())
571 .collect::<Vec<InlineConspirator>>()
572 ),
573 NodeReq::SendPayload(((arg, pyl, addr), replier)) => replier.reply(
574 self.send_payload(arg, pyl, addr)
575 ),
576 NodeReq::GetStatus(((addrs, handlers), replier)) => {
577 let conspirators = self.table
578 .iter()
579 .map(|p| p.value().clone().into())
580 .collect::<Vec<InlineConspirator>>();
581 replier.reply(CnsprcyStatus {
582 id: self.id.into(),
583 name: self.name.clone(),
584 addrs,
585 handlers,
586 conspirators
587 })
588 }
589 }
590 }
591
592#[instrument(skip_all, level = "debug", name = "node_tick")]
600 pub fn tick_peers(&self) -> Duration {
601 let mut soonest_tick = Duration::MAX;
602 let mut addrs_to_ping = Vec::new();
603
604 for mut peer in self.table.iter_mut() {
605 let TickResult{
606 address_to_ping,
607 state_changed,
608 mut next_tick_in,
609 reach_out_to,
610 } = peer.tick();
611
612 if let Some(addr) = address_to_ping {
613 addrs_to_ping.push((peer.id, addr));
614 }
615 if let Some(index) = reach_out_to {
616 match self.db.reach_out_addr(peer.id, index) {
617 Ok(Some(addr)) => addrs_to_ping.push((peer.id, addr)),
618 Ok(None) => {
619 debug!(
620 peer = ?peer.value(),
621 "no known addresses to reach out to"
622 );
623 next_tick_in = Duration::MAX;
624 }
625 Err(err) => error!(
626 peer=%peer.value(),
627 %err,
628 index,
629 "failed to select reach-out address"
630 )
631 }
632 }
633 if let Some(state) = state_changed {
634 self.dispatch_event(PeerEvent::changed(peer.id, state));
635 }
636
637 soonest_tick = std::cmp::min(soonest_tick, next_tick_in);
638 }
639
640 if addrs_to_ping.is_empty() {debug!("no pings due")}
641 else {debug!("{} pings due", addrs_to_ping.len())}
642
643 for (id, addr) in addrs_to_ping {
644 self.ping(id, addr);
645 }
646
647 soonest_tick
648 }
649
650 pub fn decrypt(&self, invitation: Invitation)
653 -> Result<InvitationContent, String>
654 {
655 let priv_key = self.db.read_config(Config::PRIVATE_KEY)
656 .map_err(|e| format!("failed to read private key from db: {e}"))?
657 .ok_or("cannot accept invitation without private key")?
658 .parse::<KeyArg>()?;
659 let content = invitation.decrypt(&priv_key.get().into())?;
660 let InvitationContent { id, name, key, addresses } = &content;
661 info!(%id, %name, ?addresses, "accepting invitation");
662 match self.db.write_config(Config::KEY, &key.to_string()) {
664 Ok(true) => Ok(content),
665 Ok(false) => Err(
666 "writing new key to db didn't change any rows".to_string()
667 ),
668 Err(err) => Err(format!("failed to store new key in the db: {err}"))
669 }
670 }
671
672 #[instrument(skip(self), name = "node_join")]
673 fn join(&mut self, id: ID, name: String, addr: Address) -> bool {
674 if self.add_new_peer(Peer::new(id, name, addr)) {
675 self.send(id, Payload::None);
677 info!("message sent");
678 true
679 }
680 else {
681 error!("failed to add peer, message not sent");
682 false
683 }
684 }
685
686 pub fn leave(&self) -> bool {
687 self.broadcast(ProtocolPayload::Quit.into())
688 }
689
690 pub fn send_payload(
691 &self,
692 to: NodeArg,
693 pyl: Payload,
694 address: Option<Address>)
695 -> bool
696 {
697 let get_id_and_address = |p: &mut Peer| {
698 match address {
699 Some(addr) if address != p.get_address() => (p.id, Some(addr)),
700 Some(_) | None => (p.id, p.get_address_to_ping())
702 }
703 };
704 let is_id = match &to {
705 NodeArg::ID(id) => Some((id.get(), address)),
706 _ => None
707 };
708
709 self.resolve_and_then(to, get_id_and_address)
711 .or(is_id)
713 .and_then(|(id, maybe_addr)| maybe_addr.map(|a| (id, a)))
715 .map(|(dst, addr)| (addr, self.packet(dst, PingOrPong::Ping, pyl)))
716 .map(|pkt_to| self.send_packet(pkt_to))
717 .is_some()
718 }
719
720#[instrument(skip(self), level = "debug" name = "node_handle_pkt")]
727 pub fn handle_pkt(&mut self, from: Address, pkt: message::Packet) {
728 let new_address = match self.validate_packet(&pkt, from) {
737 ValidationResult::Ok => None,
738 ValidationResult::NewAddress(addr) => match &pkt.pyl {
739 Payload::Protocol(
740 ProtocolPayload::Ops(ops) | ProtocolPayload::Sync {ops, ..}
741 ) => {
742 let added_this_address = |op: &Op| {
744 op.target == pkt.src &&
745 op.action == Action::AddAddress(addr)
746 };
747 if ops.iter().any(added_this_address) {None}
749 else {Some(addr)}
750 },
751 _ => {Some(addr)}
752 },
753 ValidationResult::Disabled => {
754 if pkt.hsh == self.hash() {
756 warn!("in sync with disabled peer");
758 return;
759 }
760 match pkt.pyl {
761 Payload::Protocol(ProtocolPayload::Sync{counters, ops}) => {
762 let countered = !self.absorb(ops.into_iter());
764 match self.db.sync(&counters)
765 .and_then(|ops| Ok(ProtocolPayload::Sync {
766 ops,
767 counters: self.db.vector()?
768 }))
769 {
770 Ok(pyl) => {
771 self.send_payload(
772 pkt.src.into(),
773 pyl.into(),
774 Some(from)
775 );
776 }
777 Err(err) => {
778 error!(%err, "failed to create sync op");
779 }
780 }
781 if countered {
782 warn!("received counter-disable op, shutting down");
783 self.shutdown();
784 }
785 },
786 _ => {
787 self.send_payload(
788 pkt.src.into(),
789 self.init_sync(),
790 Some(from)
791 );
792 }
793 }
794 return;
795 },
796 ValidationResult::Invalid => {
797 warn!("Ignoring invalid packet from {}", from);
798 return
799 }
800 };
801 self.dispatch_event(PeerEvent::seen(&pkt));
802
803 self.process_vector_clock(pkt.vec);
810
811 let response = match pkt.pyl {
812 Payload::Protocol(pyl) => match pyl {
814 ProtocolPayload::Ops(ops) => {
815 if !self.absorb(ops.into_iter()) {
816 warn!("received disable op, shutting down");
817 self.shutdown();
818 return;
819 }
820 Payload::None
821 },
822 ProtocolPayload::Sync { counters, ops } => {
823 if !self.absorb(ops.into_iter()) {
824 warn!("received disable op, shutting down");
825 self.shutdown();
826 return;
827 }
828 if pkt.hsh == self.hash() {Payload::None} else {
829 self.db.sync(&counters)
830 .and_then(|ops| Ok(ProtocolPayload::Sync {
831 ops,
832 counters: self.db.vector()?
833 }))
834 .map(Payload::Protocol)
835 .unwrap_or_else(|err| {
836 error!(%err, "failed to sync");
837 Payload::None
838 })
839 }
840 },
841 ProtocolPayload::Quit => {
842 self.table
843 .get_mut(&pkt.src)
844 .expect("peer has to exist after validate_packet")
845 .has_quit();
846 self.dispatch_event(
848 PeerEvent::changed(pkt.src, Status::Quit)
849 );
850 debug!("peer quit, finishing early");
851 return
852 }
853 },
854 Payload::Dynamic(e) => self.handle_dynamic(pkt.src, e),
855 Payload::None => Payload::None
856 };
857
858 let mismatch = pkt.hsh != self.hash();
859
860 if response != Payload::None {
871 debug!(?response, "finished!");
872 self.send_or_respond(pkt.src, !pkt.png, response)
873 }
874 else if mismatch {
875 let resp = self.init_sync();
876 debug!(?resp, "synchronizing with peer");
877 self.send_or_respond(pkt.src, !pkt.png, resp)
878 }
879 else if let Some(addr) = new_address {
880 match self.add_address(pkt.src, addr) {
881 Ok(op) => self.send_or_respond(
882 pkt.src,
883 !pkt.png,
884 ProtocolPayload::Ops(vec![op]).into()
885 ),
886 Err(err) if pkt.png == PingOrPong::Ping => {
887 error!(%err, %addr, "failed to log new address");
888 debug!("ponging anyway");
889 self.respond(pkt.src, Payload::None)
890 },
891 Err(err) => error!(%err, %addr, "failed to log new address")
892 };
893 }
894 else if pkt.png == PingOrPong::Ping {
895 debug!(?response, "finished!");
896 self.send_or_respond(pkt.src, !pkt.png, response)
897 }
898 else {debug!("finished")}
899 }
900
901 fn validate_packet(&self, pkt: &Packet, addr: Address) -> ValidationResult {
903 if pkt.dst != self.id {
909 warn!(%pkt.dst, "packet has invalid dst ID");
910 return ValidationResult::Invalid;
911 }
912 let mut peer = if let Some(p) = self.table.get_mut(&pkt.src) {p} else {
913 if let Ok(Some((c, addrs))) = self.db.get_conspirator(pkt.src) {
914 if !c.active {
915 if pkt.clk <= c.clock {
916 return ValidationResult::Invalid;
917 }
918 info!("contacted by disabled peer");
919 return match self.db.persist_clocks([(pkt.src, pkt.clk)]) {
921 Ok(()) => ValidationResult::Disabled,
922 Err(err) => {
923 error!(
924 %err,
925 %pkt.clk,
926 %pkt.src,
927 "failed to persist clock for disabled peer"
928 );
929 ValidationResult::Invalid
930 }
931 }
932 }
933 warn!(?c, "contacted by conspirator that was not in the table");
934 self.add_new_peer(
935 Peer::load(c.id, c.name, c.clock, c.state, addrs)
936 );
937 }
938 else {
939 info!(id=%pkt.src, "contacted by unknown peer");
940 if !self.add_new_peer(Peer::unknown(pkt.src)) {
941 error!("unable to add peer, cannot process packet");
942 return ValidationResult::Invalid
943 }
944 }
945 let Some(peer) = self.table.get_mut(&pkt.src) else {
946 error!("cannot get peer that was just added");
947 return ValidationResult::Invalid
948 };
949 peer
950 };
951 self.update_link(&mut peer, pkt, addr)
952 }
953
954 fn update_link(
956 &self,
957 peer: &mut Peer,
958 pkt: &Packet,
959 addr: Address)
960 -> ValidationResult
961 {
962 if peer.new_clock(pkt.clk) {
963 if !peer.is_active() {
964 self.dispatch_event(
965 PeerEvent::changed(peer.id, Status::Active)
966 );
967 }
968 let res = peer.has_pnged(addr, pkt.png);
969 if res.was_inactive {
970 self.dispatch_event(
971 PeerEvent::changed(peer.id, Status::Active)
972 );
973 }
974 if res.new_address {
975 info!(%peer.id, %addr, "peer changed address");
976 self.dispatch_event(PeerEvent::new_address(pkt.src, addr));
977 let is_unknown_address = self.db.new_address(peer.id, addr)
978 .map_err(|err| error!(
979 %err,
980 %peer.id,
981 %addr,
982 "failed to determine if address is new"
983 ))
984 .unwrap_or(false);
985 if is_unknown_address {
986 info!(%peer.id, %addr, "discovered new address");
987 peer.new_addr_pending = true;
989 self.dispatch_event(PeerEvent::new_address(pkt.src, addr));
990 }
991 }
992 peer.get_pending_address()
993 .map(ValidationResult::NewAddress)
994 .unwrap_or(ValidationResult::Ok)
995 }
996 else {ValidationResult::Invalid}
997 }
998
999 fn process_vector_clock(&self, clock: Vec<PeerState>) {
1001 for ps in clock {
1002 match self.table.get_mut(&ps.id) {
1003 Some(mut p) => if let Some(status) = p.absorb_peer_state(ps) {
1004 self.dispatch_event(PeerEvent::changed(p.id, status));
1005 },
1006 None if ps.id == self.id =>
1007 error!("received PeerState with own ID"),
1008 None => {
1009 let id = ps.id;
1010 info!(%id, "discovered unknown peer");
1011 let state_changed = self.table.entry(id)
1012 .or_insert_with(|| Peer::unknown(id))
1013 .absorb_peer_state(ps);
1014 if let Some(status) = state_changed {
1015 self.dispatch_event(PeerEvent::changed(id, status));
1016 }
1017 }
1018 }
1019 }
1020 }
1021
1022 fn handle_dynamic(&self, id: ID, dynamic: DynamicPayload) -> Payload {
1024 match &dynamic {
1025 DynamicPayload::Push{tag, msg} => {
1026 self.with_name(id, |name| info!(
1027 "[PUSH] {}: [{}] {}",
1028 name,
1029 tag,
1030 msg
1031 ));
1032 },
1033 DynamicPayload::Query{tag, msg} => {
1034 self.with_name(id, |name| info!(
1035 "[QURY] {}: [{}] {}",
1036 name,
1037 tag,
1038 msg
1039 ));
1040 },
1041 &DynamicPayload::Response{ref tag, ref msg, to} => {
1042 let (resp_to_id, resp_to_clk) = to;
1043 if resp_to_id == self.id {
1044 self.with_name(id, |name| info!(
1045 "[RESP] {} (/{}): [{}] {}",
1046 name,
1047 resp_to_clk,
1048 tag,
1049 msg
1050 ));
1051 }
1052 else if let Some(resp_to_peer) = self.table.get(&resp_to_id) {
1053 let resp_to_name = resp_to_peer.name.clone();
1054 self.with_name(id, |name| info!(
1055 "[RESP] {} @ {}/{}: [{}] {}",
1056 name,
1057 resp_to_name,
1058 resp_to_clk,
1059 tag,
1060 msg
1061 ));
1062 }
1063 else {
1064 warn!(%resp_to_id, "received response to an unknown peer");
1065 self.with_name(id, |name| info!(
1066 "[RESP] {} @ {}/{}: [{}] {}",
1067 name,
1068 resp_to_id,
1069 resp_to_clk,
1070 tag,
1071 msg
1072 ));
1073 }
1074 }
1075 }
1076 self.dispatch_event((id, dynamic));
1077 Payload::None
1078 }
1079
1080 fn with_name<R>(&self, id: ID, f: impl FnOnce(&str) -> R) -> Option<R> {
1081 self.table.view(&id, |_, p| f(p.name.as_str()))
1082 }
1083
1084}