cnsprcy/
node.rs

1use std::time::Duration;
2use std::sync::atomic::Ordering;
3
4#[cfg(target_has_atomic = "64")]
5use std::sync::atomic::AtomicU64;
6
7#[cfg(not(target_has_atomic = "64"))]
8use portable_atomic::AtomicU64;
9
10use crypto_box::SecretKey;
11use dashmap::DashMap;
12use rusqlite::{
13	Error,
14	types::Type,
15	Result as SqlResult
16};
17use tracing::instrument;
18
19use crate::control::{
20	Conspirator as InlineConspirator,
21	CnsprcyStatus,
22	KeyArg,
23	NodeArg,
24	NodeReq
25};
26use crate::event::{
27	AnyEvent,
28	PeerEvent
29};
30use crate::message::{
31	self,
32	DynamicPayload,
33	Payload,
34	Packet,
35	PingOrPong,
36	ProtocolPayload
37};
38use crate::peer::{
39	Address,
40	AtomicClock,
41	data::TickResult,
42	Invitation,
43	induct::Content as InvitationContent,
44	NodeID as ID,
45	Peer,
46	PeerState,
47	Status
48};
49use crate::store::{
50	Store,
51	Action,
52	Config,
53	Conspirator,
54	Op
55};
56
57use crate::util::*;
58
59#[derive(Debug)]
60pub struct Node {
61	id: ID,
62	name: String,
63	clk: AtomicClock,
64	hash: AtomicU64,
65	outbound: Sender<PktTo>,
66	dispatcher_sender: Sender<AnyEvent>,
67	db: Store,
68	table: DashMap<ID, Peer>
69}
70
71pub enum ValidationResult {
72	Ok,
73	/// Peer has a new(-ish) [`Address`] pending insertion into the op chain
74	NewAddress(Address),
75	/// Peer has been disabled, only allow Op sync
76	Disabled,
77	Invalid
78}
79
80impl Node {
81
82/*
83 *	CONSTRUCTOR
84 */
85
86	#[instrument(skip_all, level = "debug")]
87	pub fn load(
88		db: Store,
89		outbound: Sender<PktTo>,
90		dispatcher_sender: Sender<AnyEvent>)
91		-> SqlResult<Option<Self>>
92	{
93		let own_id = db.read_config(Config::ID)
94			.inspect_err(|err| error!(%err, "failed to load own id"))?
95			.ok_or(Error::QueryReturnedNoRows)?
96			.parse::<ID>()
97			.map_err(|err| Error::FromSqlConversionFailure(
98				1,
99				Type::Text,
100				format!("failed to load own node ID: {err}").into()
101			))?;
102		debug!(%own_id, "loaded");
103		let mut clock: Option<AtomicClock> = None;
104		let mut name: Option<String> = None;
105		let table = DashMap::new();
106		for (conspirator, addr_count) in db.get_conspirators()? {
107			debug!(?conspirator, "loading");
108			if conspirator.id == own_id {
109				clock = Some(AtomicClock::load(conspirator.clock));
110				name = Some(conspirator.name);
111				if !conspirator.active {
112					warn!("node has been disabled!");
113					return Ok(None);
114				}
115			}
116			else if conspirator.active {
117				let mut peer = Peer::load(
118					conspirator.id,
119					conspirator.name,
120					conspirator.clock,
121					conspirator.state,
122					addr_count
123				);
124				peer.update_address_count(addr_count);
125				table.insert(conspirator.id, peer);
126			}
127		}
128		let hash = db.hash()?.into();
129		let Some((name, clk)) = name.zip(clock) else {
130			error!(%own_id, "corrupt db: missing own entry among conspirators");
131			return Err(Error::QueryReturnedNoRows);
132		};
133
134		Ok(Some(Self {
135			id: own_id,
136			name,
137			clk,
138			hash,
139			outbound,
140			dispatcher_sender,
141			db,
142			table
143		}))
144	}
145
146	pub fn init(id: ID, name: String, db: Store)
147		-> SqlResult<(Self, Receiver<AnyEvent>, Receiver<PktTo>)>
148	{
149		// Create dispatcher channel
150		let (dispatcher_sender, dispatcher_receiver)
151			= new_channel::<AnyEvent>();
152
153		// Create outbound packet channel
154		let (outbound, outbound_packet_receiver)
155			= new_channel::<PktTo>();
156
157		db.write_config(Config::ID, &id.to_string())?;
158		let name_op = db.create_op(id, id, Action::Name(name))?;
159		db.absorb_op(name_op)?;
160		Self::load(db, outbound, dispatcher_sender).map(|node_opt| (
161			node_opt.expect("newly created node should be enabled"),
162			dispatcher_receiver,
163			outbound_packet_receiver
164		))
165	}
166
167	#[cfg(test)]
168	pub fn stop(self) -> Store {
169		self.db
170	}
171
172/*
173 *
174 *	HELPER METHODS
175 *
176 */
177
178	fn hash(&self) -> u64 {
179		self.hash.load(Ordering::Acquire)
180	}
181
182	fn update_hash(&self) {
183		match self.db.hash() {
184			Ok(hash) => self.hash.store(hash, Ordering::Release),
185			Err(err) => error!(%err, "failed to update hash")
186		}
187	}
188
189	fn vector_clock_for(&self, to: ID) -> Vec<PeerState> {
190		self.table
191			.iter()
192			.filter(|p| p.id != to)
193			.map(|p| p.get_peer_state())
194			.collect()
195	}
196
197	fn init_sync(&self) -> Payload {
198		match self.db.vector() {
199			Ok(counters) => ProtocolPayload::Sync {
200				counters,
201				ops: vec![] //TODO: send last 3 Ops or something
202			}.into(),
203			Err(err) => {
204				error!(%err, "failed to construct Sync payload");
205				Payload::None
206			}
207		}
208	}
209
210	/// Add a new [`Peer`] to the table
211	/// Doesn't create the corresponding [`Conspirator`] and [`Op`]s.
212	#[instrument(skip_all, fields(peer = %new), level = "debug")]
213	fn add_new_peer(&self, mut new: Peer) -> bool {
214		if let Some(old) = self.table.get(&new.id) {
215			error!(old = ?old.value(), ?new, "attempted to clobber peer");
216			false
217		}
218		else if new.id == self.id {
219			error!(%self.id, %new.name, "tried to add peer with own id");
220			false
221		}
222		else if self.db.get::<Conspirator>(new.id)
223			.is_ok_and(|o| o.is_some_and(|c| !c.active))
224		{
225			warn!("tried to re-add disabled peer");
226			false
227		}
228		else {
229			if let Some(addr) = new.get_address() {
230				match self.db.new_address(new.id, addr) {
231					Ok(true) => new.new_addr_pending = true,
232					Ok(false) => {},
233					Err(err) => error!(
234						%err,
235						%addr,
236						"failed to determine whether address is new"
237					)
238				}
239			}
240			self.table.insert(new.id, new);
241			info!("added");
242			true
243		}
244	}
245
246	fn resolve_and_then<F, R>(&self, node_arg: NodeArg, f: F) -> Option<R>
247		where F: Fn(&mut Peer) -> R
248	{
249		match node_arg {
250			NodeArg::ID(id) => self.table
251				.get_mut(&id.into())
252				.map(|mut p| f(&mut p)),
253			NodeArg::Name(name) => self.table
254				.iter_mut()
255				.find(|p| p.name == name)
256				.map(|mut p| f(&mut p))
257		}
258	}
259
260	#[instrument(skip(self), level = "debug")]
261	fn add_address(&mut self, id: ID, addr: Address) -> SqlResult<Op> {
262		let op = self.db.create_op(self.id, id, Action::AddAddress(addr))?;
263		info!(?op, "created");
264		// we didn't disable ourself & creating a new op can never unblock a held op
265		let _ = self.absorb([op.clone()].into_iter());
266
267		Ok(op)
268	}
269
270	/// Store new [`Op`]s & update state if necessary.
271	/// Returns `false` if this node has been disabled.
272	#[must_use]
273	fn absorb(&mut self, ops: impl Iterator<Item = Op>) -> bool {
274		let mut unchanged = true;
275		//TODO: nicer error message maybe
276		for op in ops {
277			match self.db.absorb_op(op) {
278				Ok(true) => unchanged = false,
279				Ok(false) => {},
280				Err(err) => error!(%err, "failed to absorb op")
281			}
282		}
283		if unchanged {return true}
284
285		self.update_hash();
286
287		let conspirators = match self.db.get_conspirators() {
288			Ok(conspirators) => conspirators,
289			Err(err) => {
290				error!(%err, "failed to load conspirators");
291				return true
292			}
293		};
294		for (conspirator, addr_count) in conspirators {
295			if conspirator.id == self.id {
296				self.name = conspirator.name;
297				if !conspirator.active {return false}
298			}
299			else if !conspirator.active {
300				if let Some((_id, p)) = self.table.remove(&conspirator.id) {
301					info!(%p, "removed deactivated conspirator");
302				}
303			}
304			else {
305				let Some(mut p) = self.table.get_mut(&conspirator.id) else {
306					let peer = Peer::load(
307						conspirator.id,
308						conspirator.name,
309						conspirator.clock,
310						conspirator.state,
311						addr_count
312					);
313					self.table.insert(conspirator.id, peer);
314					continue;
315				};
316				//TODO: if p.get_address().is_some() addr_count.min(1) ??
317				p.update_address_count(addr_count);
318				p.name = conspirator.name;
319				p.new_addr_pending = p.get_pending_address()
320					.and_then(|address| self.db.new_address(p.id, address)
321						.map_err(|err| error!(
322							%err,
323							%address,
324							"can't tell if address is new, assuming no"
325						))
326						.ok())
327					.inspect(|&pending| if !pending {
328						debug!(%p.id, "pending address cleared")
329					})
330					.unwrap_or(false);
331			}
332		}
333		true
334	}
335
336	//TODO: rework as author_ops by splitting out "reload" from absorb
337	#[instrument(skip(self), level = "debug")]
338	fn author_op(&mut self, target: ID, action: Action) -> bool {
339		match self.db.create_op(self.id, target, action) {
340			Ok(op) => {
341				// we'll check later if we disabled ourselves
342				let _ = self.absorb([op.clone()].into_iter());
343				// broadcast it
344				self.broadcast(ProtocolPayload::Ops(vec![op]).into())
345			},
346			Err(err) => {
347				error!(%err, "failed to create op");
348				false
349			}
350		}
351	}
352
353	/// Try to cause the daemon to shut down by closing all our channels
354	fn shutdown(&mut self) {
355		// Create dummy dispatcher channel
356		let (dispatcher_sender, _dispatcher_receiver)
357			= new_channel::<AnyEvent>();
358
359		// Create dummy outbound packet channel
360		let (outbound, _outbound_packet_receiver)
361			= new_channel::<PktTo>();
362
363		self.outbound = outbound;
364		self.dispatcher_sender = dispatcher_sender;
365	}
366
367/*
368 *	INTERNAL
369 */
370
371	fn send_packet(&self, pkt: PktTo) {
372		self.outbound
373			.send(pkt)
374			.unwrap_or_else(|pkt| error!(?pkt, "node failed to send packet"))
375	}
376
377	fn packet(&self, dst: ID, png: PingOrPong, pyl: Payload) -> Packet {
378		let clk = self.clk.next();
379		let hsh = self.hash();
380		let vec = self.vector_clock_for(dst);
381		Packet { dst, src: self.id, clk, hsh, vec, png, pyl }
382	}
383
384	fn ping(&self, to: ID, at: Address) {
385		let packet = self.packet(to, PingOrPong::Ping, Payload::None);
386		self.send_packet((at, packet));
387	}
388
389	fn send(&self, to: ID, pyl: Payload) {
390		let addr = self.table
391			.get_mut(&to)
392			.expect("send() to non-existent peer")
393			.get_address_to_ping()
394			.expect("send() to peer without address");
395		let packet = self.packet(to, PingOrPong::Ping, pyl);
396		self.send_packet((addr, packet));
397	}
398
399	fn respond(&self, to: ID, pyl: Payload) {
400		let addr = self.table
401			.get(&to)
402			.expect("respond to non-existent peer")
403			.get_address()
404			.expect("respond to peer without address");
405		let packet = self.packet(to, PingOrPong::Pong, pyl);
406		self.send_packet((addr, packet));
407	}
408
409	fn send_or_respond(&self, to: ID, png: PingOrPong, pyl: Payload) {
410		match png {
411			PingOrPong::Ping => self.send(to, pyl),
412			PingOrPong::Pong => self.respond(to, pyl)
413		}
414	}
415
416	fn broadcast(&self, pyl: Payload) -> bool {
417		let active_ids: Vec<ID> = self.table
418			.iter()
419			.filter(|p| p.is_active())
420			.map(|p| p.id)
421			.collect();
422		for active_id in active_ids {
423			self.send(active_id, pyl.clone())
424		}
425		let clocks = self.table.iter()
426			.map(|r| (*r.key(), r.value().get_peer_state().clk))
427			.chain([(self.id, self.clk.prev())]);
428		self.db.persist_clocks(clocks)
429			.map_err(|err| error!(%err, "failed to persist clocks"))
430			.is_ok()
431	}
432
433	fn dispatch_event<E: Into<AnyEvent>>(&self, event: E) {
434		self.dispatcher_sender
435			.send(event.into())
436			.unwrap_or_else(|evt| error!(?evt, "node failed to send event"))
437	}
438
439/*
440 *	EXTERNAL
441 */
442
443	pub fn handle_request(&mut self, req: NodeReq) {
444		match req {
445			NodeReq::Write(((key, value), replier)) => {
446				replier.reply(
447					self.author_op(self.id, Action::Write {key, value})
448				);
449			},
450			NodeReq::AddAddress(((arg, addr), replier)) => {
451				let known_id = match &arg {
452					NodeArg::ID(id) => self.table.contains_key(&id.get())
453						.then_some(id.get()),
454					NodeArg::Name(n) => self.table.iter()
455						.find(|p| &p.name == n)
456						.map(|p| p.id)
457				};
458				let Some(id) = known_id else {
459					error!(%arg, "no such node");
460					replier.reply(false);
461					return;
462				};
463				match self.db.new_address(id, addr) {
464					Ok(true) => match self.add_address(id, addr) {
465						Ok(_op) => replier.reply(true),
466						Err(err) => {
467							error!(%addr, %err, "failed to add address");
468							replier.reply(false)
469						}
470					},
471					//TODO: report that it already existed somehow?
472					Ok(false) => replier.reply(true),
473					Err(err) => {
474						error!(%addr, %err, "failed to add address");
475						replier.reply(false)
476					}
477				}
478			},
479			NodeReq::Advertise(req) => {
480				//TODO: warn/error if existing peers?
481				//create DH keypair
482				let private_key = KeyArg::random();
483				//store private key
484				let res = self.db.write_config(
485					Config::PRIVATE_KEY,
486					&private_key.to_string()
487				);
488				match res {
489					Ok(true) => {},
490					Ok(false) => warn!("unexpected number of changed rows"),
491					Err(err) => {
492						error!(?err, "failed to store new private key");
493						//TODO: reply with an error here
494						return;
495					}
496				}
497				//return public key
498				let public_key = SecretKey::from_bytes(private_key.get())
499					.public_key()
500					.to_bytes();
501				req.reply(public_key);
502			},
503			NodeReq::Invite(((pubkey, addresses), replier)) => {
504				let key = match self.db.read_config(Config::KEY) {
505					Ok(Some(key_str)) => match key_str.parse() {
506						Ok(key) => key,
507						Err(err) => {
508							error!(?err, "failed to read cnsprcy key from db");
509							return;
510						}
511					},
512					Ok(None) => {
513						error!("cnsprcy key not in db");
514						return;
515					},
516					Err(err) => {
517						error!(?err, "failed to read cnsprcy key from db");
518						return;
519					}
520				};
521				let content = InvitationContent {
522					id: self.id,
523					name: self.name.clone(),
524					key,
525					addresses
526				};
527				let cb_public_key = pubkey.clone().get().into();
528				match Invitation::encrypt(&cb_public_key, &content) {
529					Ok(invitation) => replier.reply(invitation),
530					Err(err) => {
531						error!(%err, %pubkey, "failed to encrypt invitation");
532					}
533				};
534			},
535			NodeReq::Disable((arg, replier)) => {
536				// the table should only contain active conspirators
537				let known_id = match &arg {
538					NodeArg::ID(id) if id.get() == self.id => Some(self.id),
539					NodeArg::Name(n) if n == &self.name => Some(self.id),
540					NodeArg::ID(id) => self.table.contains_key(&id.get())
541						.then_some(id.get()),
542					NodeArg::Name(n) => self.table.iter()
543						.find(|p| &p.name == n)
544						.map(|p| p.id)
545				};
546				let Some(id) = known_id else {
547					error!(%arg, "no such (enabled) node");
548					replier.reply(false);
549					return;
550				};
551				replier.reply(
552					self.author_op(id, Action::Active(false))
553				);
554				if id == self.id {
555					warn!("self-disabled successfully, shutting down");
556					self.shutdown();
557				}
558			},
559			NodeReq::Join((join_req, replier)) => replier.reply(
560				self.join(join_req.id.into(), join_req.name, join_req.addr)
561			),
562			NodeReq::GetConspirator((arg, replier)) => {
563				if let Some(p) = self.resolve_and_then(arg, |p| p.clone()) {
564					replier.reply(p);
565				}
566			},
567			NodeReq::GetConspirators(req) => req.reply(
568				self.table
569					.iter()
570					.map(|p| p.value().clone().into())
571					.collect::<Vec<InlineConspirator>>()
572			),
573			NodeReq::SendPayload(((arg, pyl, addr), replier)) => replier.reply(
574				self.send_payload(arg, pyl, addr)
575			),
576			NodeReq::GetStatus(((addrs, handlers), replier)) => {
577				let conspirators = self.table
578					.iter()
579					.map(|p| p.value().clone().into())
580					.collect::<Vec<InlineConspirator>>();
581				replier.reply(CnsprcyStatus {
582					id: self.id.into(),
583					name: self.name.clone(),
584					addrs,
585					handlers,
586					conspirators
587				})
588			}
589		}
590	}
591
592/*
593 *
594 *	ACTIVE
595 *
596 */
597 /* These functions are called when a control or timer event occurs */
598
599	#[instrument(skip_all, level = "debug", name = "node_tick")]
600	pub fn tick_peers(&self) -> Duration {
601		let mut soonest_tick = Duration::MAX;
602		let mut addrs_to_ping = Vec::new();
603
604		for mut peer in self.table.iter_mut() {
605			let TickResult{
606				address_to_ping,
607				state_changed,
608				mut next_tick_in,
609				reach_out_to,
610			} = peer.tick();
611
612			if let Some(addr) = address_to_ping {
613				addrs_to_ping.push((peer.id, addr));
614			}
615			if let Some(index) = reach_out_to {
616				match self.db.reach_out_addr(peer.id, index) {
617					Ok(Some(addr)) => addrs_to_ping.push((peer.id, addr)),
618					Ok(None) => {
619						debug!(
620							peer = ?peer.value(),
621							"no known addresses to reach out to"
622						);
623						next_tick_in = Duration::MAX;
624					}
625					Err(err) => error!(
626						peer=%peer.value(),
627						%err,
628						index,
629						"failed to select reach-out address"
630					)
631				}
632			}
633			if let Some(state) = state_changed {
634				self.dispatch_event(PeerEvent::changed(peer.id, state));
635			}
636
637			soonest_tick = std::cmp::min(soonest_tick, next_tick_in);
638		}
639
640		if addrs_to_ping.is_empty() {debug!("no pings due")}
641		else {debug!("{} pings due", addrs_to_ping.len())}
642
643		for (id, addr) in addrs_to_ping {
644			self.ping(id, addr);
645		}
646
647		soonest_tick
648	}
649
650	/// Try to decrypt the [`Invitation`] using the stored private key.
651	/// If decryption succeeds, this also stores the included encryption key.
652	pub fn decrypt(&self, invitation: Invitation)
653		-> Result<InvitationContent, String>
654	{
655		let priv_key = self.db.read_config(Config::PRIVATE_KEY)
656			.map_err(|e| format!("failed to read private key from db: {e}"))?
657			.ok_or("cannot accept invitation without private key")?
658			.parse::<KeyArg>()?;
659		let content = invitation.decrypt(&priv_key.get().into())?;
660		let InvitationContent { id, name, key, addresses } = &content;
661		info!(%id, %name, ?addresses, "accepting invitation");
662		//TODO store key here?
663		match self.db.write_config(Config::KEY, &key.to_string()) {
664			Ok(true) => Ok(content),
665			Ok(false) => Err(
666				"writing new key to db didn't change any rows".to_string()
667			),
668			Err(err) => Err(format!("failed to store new key in the db: {err}"))
669		}
670	}
671
672	#[instrument(skip(self), name = "node_join")]
673	fn join(&mut self, id: ID, name: String, addr: Address) -> bool {
674		if self.add_new_peer(Peer::new(id, name, addr)) {
675			// TODO Send op list or something!
676			self.send(id, Payload::None);
677			info!("message sent");
678			true
679		}
680		else {
681			error!("failed to add peer, message not sent");
682			false
683		}
684	}
685
686	pub fn leave(&self) -> bool {
687		self.broadcast(ProtocolPayload::Quit.into())
688	}
689
690	pub fn send_payload(
691		&self,
692		to: NodeArg,
693		pyl: Payload,
694		address: Option<Address>)
695		-> bool
696	{
697		let get_id_and_address = |p: &mut Peer| {
698			match address {
699				Some(addr) if address != p.get_address() => (p.id, Some(addr)),
700				// If the specified address is the currently active one, mark as pinged
701				Some(_) | None => (p.id, p.get_address_to_ping())
702			}
703		};
704		let is_id = match &to {
705			NodeArg::ID(id) => Some((id.get(), address)),
706			_ => None
707		};
708
709		// resolve NodeArg & (try) get address, mark as pinged if appropriate
710		self.resolve_and_then(to, get_id_and_address)
711			// if the NodeArg is an ID and unknown
712			.or(is_id)
713			// short-circuit if we don't have an address at this point
714			.and_then(|(id, maybe_addr)| maybe_addr.map(|a| (id, a)))
715			.map(|(dst, addr)| (addr, self.packet(dst, PingOrPong::Ping, pyl)))
716			.map(|pkt_to| self.send_packet(pkt_to))
717			.is_some()
718	}
719
720/*
721 *
722 *	REACTIVE
723 *
724 */
725
726	#[instrument(skip(self), level = "debug" name = "node_handle_pkt")]
727	pub fn handle_pkt(&mut self, from: Address, pkt: message::Packet) {
728		/* PACKET PROCESSING PIPELINE
729		 *	1. Validate
730		 *	2. Update Link
731		 *	3. Update Network
732		 *	4. React
733		 */
734
735		// 1. Packet validation & 2. Update link information
736		let new_address = match self.validate_packet(&pkt, from) {
737			ValidationResult::Ok => None,
738			ValidationResult::NewAddress(addr) => match &pkt.pyl {
739				Payload::Protocol(
740					ProtocolPayload::Ops(ops) | ProtocolPayload::Sync {ops, ..}
741				) => {
742					// check if new address already added by any ops
743					let added_this_address = |op: &Op| {
744						op.target == pkt.src &&
745						op.action == Action::AddAddress(addr)
746					};
747					// peer..new_addr_pending will be cleared when absorbing
748					if ops.iter().any(added_this_address) {None}
749					else {Some(addr)}
750				},
751				_ => {Some(addr)}
752			},
753			ValidationResult::Disabled => {
754				// allow op sync & nothing else
755				if pkt.hsh == self.hash() {
756					// peer should know it's disabled
757					warn!("in sync with disabled peer");
758					return;
759				}
760				match pkt.pyl {
761					Payload::Protocol(ProtocolPayload::Sync{counters, ops}) => {
762						// "counter-disable"
763						let countered = !self.absorb(ops.into_iter());
764						match self.db.sync(&counters)
765							.and_then(|ops| Ok(ProtocolPayload::Sync {
766								ops,
767								counters: self.db.vector()?
768							}))
769						{
770							Ok(pyl) => {
771								self.send_payload(
772									pkt.src.into(),
773									pyl.into(),
774									Some(from)
775								);
776							}
777							Err(err) => {
778								error!(%err, "failed to create sync op");
779							}
780						}
781						if countered {
782							warn!("received counter-disable op, shutting down");
783							self.shutdown();
784						}
785					},
786					_ => {
787						self.send_payload(
788							pkt.src.into(),
789							self.init_sync(),
790							Some(from)
791						);
792					}
793				}
794				return;
795			},
796			ValidationResult::Invalid => {
797				warn!("Ignoring invalid packet from {}", from);
798				return
799			}
800		};
801		self.dispatch_event(PeerEvent::seen(&pkt));
802
803		// 3. Update the network information
804		/* Updating information about the network:
805		 *	- process the vector clock
806		 *	- check if op-chain hashes match
807		 *	- process the payload
808		 */
809		self.process_vector_clock(pkt.vec);
810
811		let response = match pkt.pyl {
812			// 3. Handle packet (protocol) payload
813			Payload::Protocol(pyl) => match pyl {
814				ProtocolPayload::Ops(ops) => {
815					if !self.absorb(ops.into_iter()) {
816						warn!("received disable op, shutting down");
817						self.shutdown();
818						return;
819					}
820					Payload::None
821				},
822				ProtocolPayload::Sync { counters, ops } => {
823					if !self.absorb(ops.into_iter()) {
824						warn!("received disable op, shutting down");
825						self.shutdown();
826						return;
827					}
828					if pkt.hsh == self.hash() {Payload::None} else {
829						self.db.sync(&counters)
830							.and_then(|ops| Ok(ProtocolPayload::Sync {
831								ops,
832								counters: self.db.vector()?
833							}))
834							.map(Payload::Protocol)
835							.unwrap_or_else(|err| {
836								error!(%err, "failed to sync");
837								Payload::None
838							})
839					}
840				},
841				ProtocolPayload::Quit => {
842					self.table
843						.get_mut(&pkt.src)
844						.expect("peer has to exist after validate_packet")
845						.has_quit();
846					//TODO: prevent duplicates?
847					self.dispatch_event(
848						PeerEvent::changed(pkt.src, Status::Quit)
849					);
850					debug!("peer quit, finishing early");
851					return
852				}
853			},
854			Payload::Dynamic(e) => self.handle_dynamic(pkt.src, e),
855			Payload::None => Payload::None
856		};
857
858		let mismatch = pkt.hsh != self.hash();
859
860		// 4. React
861		/*	- Pong a Ping
862		 *	- Synchronize Ops
863		 *	- Event Response
864		 *
865		 *	→ Always send the response event
866		 *	→ Resolve hash mismatch eventually
867		 *	→ Create AddAddress Op after that
868		 */
869
870		if response != Payload::None {
871			debug!(?response, "finished!");
872			self.send_or_respond(pkt.src, !pkt.png, response)
873		}
874		else if mismatch {
875			let resp = self.init_sync();
876			debug!(?resp, "synchronizing with peer");
877			self.send_or_respond(pkt.src, !pkt.png, resp)
878		}
879		else if let Some(addr) = new_address {
880			match self.add_address(pkt.src, addr) {
881				Ok(op) => self.send_or_respond(
882					pkt.src,
883					!pkt.png,
884					ProtocolPayload::Ops(vec![op]).into()
885				),
886				Err(err) if pkt.png == PingOrPong::Ping => {
887					error!(%err, %addr, "failed to log new address");
888					debug!("ponging anyway");
889					self.respond(pkt.src, Payload::None)
890				},
891				Err(err) => error!(%err, %addr, "failed to log new address")
892			};
893		}
894		else if pkt.png == PingOrPong::Ping {
895			debug!(?response, "finished!");
896			self.send_or_respond(pkt.src, !pkt.png, response)
897		}
898		else {debug!("finished")}
899	}
900
901	/// 1. Packet validation
902	fn validate_packet(&self, pkt: &Packet, addr: Address) -> ValidationResult {
903		/* Validation:
904		 *	- Crypto (already done)
905		 *	- src & dst NodeIDs
906		 *	- clk value
907		 */
908		if pkt.dst != self.id {
909			warn!(%pkt.dst, "packet has invalid dst ID");
910			return ValidationResult::Invalid;
911		}
912		let mut peer = if let Some(p) = self.table.get_mut(&pkt.src) {p} else {
913			if let Ok(Some((c, addrs))) = self.db.get_conspirator(pkt.src) {
914				if !c.active {
915					if pkt.clk <= c.clock {
916						return ValidationResult::Invalid;
917					}
918					info!("contacted by disabled peer");
919					// update clock in the db
920					return match self.db.persist_clocks([(pkt.src, pkt.clk)]) {
921						Ok(()) => ValidationResult::Disabled,
922						Err(err) => {
923							error!(
924								%err,
925								%pkt.clk,
926								%pkt.src,
927								"failed to persist clock for disabled peer"
928							);
929							ValidationResult::Invalid
930						}
931					}
932				}
933				warn!(?c, "contacted by conspirator that was not in the table");
934				self.add_new_peer(
935					Peer::load(c.id, c.name, c.clock, c.state, addrs)
936				);
937			}
938			else {
939				info!(id=%pkt.src, "contacted by unknown peer");
940				if !self.add_new_peer(Peer::unknown(pkt.src)) {
941					error!("unable to add peer, cannot process packet");
942					return ValidationResult::Invalid
943				}
944			}
945			let Some(peer) = self.table.get_mut(&pkt.src) else {
946				error!("cannot get peer that was just added");
947				return ValidationResult::Invalid
948			};
949			peer
950		};
951		self.update_link(&mut peer, pkt, addr)
952	}
953
954	/// 2. Update peer link information
955	fn update_link(
956		&self,
957		peer: &mut Peer,
958		pkt: &Packet,
959		addr: Address)
960		-> ValidationResult
961	{
962		if peer.new_clock(pkt.clk) {
963			if !peer.is_active() {
964				self.dispatch_event(
965					PeerEvent::changed(peer.id, Status::Active)
966				);
967			}
968			let res = peer.has_pnged(addr, pkt.png);
969			if res.was_inactive {
970				self.dispatch_event(
971					PeerEvent::changed(peer.id, Status::Active)
972				);
973			}
974			if res.new_address {
975				info!(%peer.id, %addr, "peer changed address");
976				self.dispatch_event(PeerEvent::new_address(pkt.src, addr));
977				let is_unknown_address = self.db.new_address(peer.id, addr)
978					.map_err(|err| error!(
979						%err,
980						%peer.id,
981						%addr,
982						"failed to determine if address is new"
983					))
984					.unwrap_or(false);
985				if is_unknown_address {
986					info!(%peer.id, %addr, "discovered new address");
987					// another peer could have discovered it first, so mark it as pending until we can assume to be synced
988					peer.new_addr_pending = true;
989					self.dispatch_event(PeerEvent::new_address(pkt.src, addr));
990				}
991			}
992			peer.get_pending_address()
993				.map(ValidationResult::NewAddress)
994				.unwrap_or(ValidationResult::Ok)
995		}
996		else {ValidationResult::Invalid}
997	}
998
999	/// 3. Update the network information
1000	fn process_vector_clock(&self, clock: Vec<PeerState>) {
1001		for ps in clock {
1002			match self.table.get_mut(&ps.id) {
1003				Some(mut p) => if let Some(status) = p.absorb_peer_state(ps) {
1004					self.dispatch_event(PeerEvent::changed(p.id, status));
1005				},
1006				None if ps.id == self.id =>
1007					error!("received PeerState with own ID"),
1008				None => {
1009					let id = ps.id;
1010					info!(%id, "discovered unknown peer");
1011					let state_changed = self.table.entry(id)
1012						.or_insert_with(|| Peer::unknown(id))
1013						.absorb_peer_state(ps);
1014					if let Some(status) = state_changed {
1015						self.dispatch_event(PeerEvent::changed(id, status));
1016					}
1017				}
1018			}
1019		}
1020	}
1021
1022	/// 3. Handle packet (dynamic) payload
1023	fn handle_dynamic(&self, id: ID, dynamic: DynamicPayload) -> Payload {
1024		match &dynamic {
1025			DynamicPayload::Push{tag, msg} => {
1026				self.with_name(id, |name| info!(
1027					"[PUSH] {}: [{}] {}",
1028					name,
1029					tag,
1030					msg
1031				));
1032			},
1033			DynamicPayload::Query{tag, msg} => {
1034				self.with_name(id, |name| info!(
1035					"[QURY] {}: [{}] {}",
1036					name,
1037					tag,
1038					msg
1039				));
1040			},
1041			&DynamicPayload::Response{ref tag, ref msg, to} => {
1042				let (resp_to_id, resp_to_clk) = to;
1043				if resp_to_id == self.id {
1044					self.with_name(id, |name| info!(
1045						"[RESP] {} (/{}): [{}] {}",
1046						name,
1047						resp_to_clk,
1048						tag,
1049						msg
1050					));
1051				}
1052				else if let Some(resp_to_peer) = self.table.get(&resp_to_id) {
1053					let resp_to_name = resp_to_peer.name.clone();
1054					self.with_name(id, |name| info!(
1055						"[RESP] {} @ {}/{}: [{}] {}",
1056						name,
1057						resp_to_name,
1058						resp_to_clk,
1059						tag,
1060						msg
1061					));
1062				}
1063				else {
1064					warn!(%resp_to_id, "received response to an unknown peer");
1065					self.with_name(id, |name| info!(
1066						"[RESP] {} @ {}/{}: [{}] {}",
1067						name,
1068						resp_to_id,
1069						resp_to_clk,
1070						tag,
1071						msg
1072					));
1073				}
1074			}
1075		}
1076		self.dispatch_event((id, dynamic));
1077		Payload::None
1078	}
1079
1080	fn with_name<R>(&self, id: ID, f: impl FnOnce(&str) -> R) -> Option<R> {
1081		self.table.view(&id, |_, p| f(p.name.as_str()))
1082	}
1083
1084}