1use crate::{
2 AuthorizedDevice, DevicePubkey, DeviceRoster, DomainError, Error, Invite, InviteResponse,
3 InviteResponseEnvelope, MessageEnvelope, OwnerPubkey, ProtocolContext, Result,
4 RosterSnapshotDecision, Session, SessionState, UnixSeconds,
5};
6use rand::{CryptoRng, RngCore};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9
10const MAX_INACTIVE_SESSIONS: usize = 10;
11#[derive(Debug, Clone)]
12pub struct SessionManager {
13 local_owner_pubkey: OwnerPubkey,
14 local_device_pubkey: DevicePubkey,
15 local_device_secret_key: [u8; 32],
16 local_invite: Option<Invite>,
17 users: BTreeMap<OwnerPubkey, UserRecord>,
18}
19
20#[derive(Debug, Clone)]
21struct UserRecord {
22 owner_pubkey: OwnerPubkey,
23 roster: Option<DeviceRoster>,
24 devices: BTreeMap<DevicePubkey, DeviceRecord>,
25}
26
27#[derive(Debug, Clone)]
28struct DeviceRecord {
29 device_pubkey: DevicePubkey,
30 authorized: bool,
31 is_stale: bool,
32 stale_since: Option<UnixSeconds>,
33 claimed_owner_pubkey: Option<OwnerPubkey>,
34 public_invite: Option<Invite>,
35 invite_response_generated: bool,
36 active_session: Option<Session>,
37 inactive_sessions: Vec<Session>,
38 last_activity: Option<UnixSeconds>,
39 created_at: UnixSeconds,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct SessionManagerSnapshot {
44 pub local_owner_pubkey: OwnerPubkey,
45 pub local_device_pubkey: DevicePubkey,
46 pub local_invite: Option<Invite>,
47 pub users: Vec<UserRecordSnapshot>,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
51pub struct UserRecordSnapshot {
52 pub owner_pubkey: OwnerPubkey,
53 pub roster: Option<DeviceRoster>,
54 pub devices: Vec<DeviceRecordSnapshot>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
58pub struct DeviceRecordSnapshot {
59 pub device_pubkey: DevicePubkey,
60 pub authorized: bool,
61 pub is_stale: bool,
62 pub stale_since: Option<UnixSeconds>,
63 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub claimed_owner_pubkey: Option<OwnerPubkey>,
65 pub public_invite: Option<Invite>,
66 #[serde(default)]
67 pub invite_response_generated: bool,
68 pub active_session: Option<SessionState>,
69 pub inactive_sessions: Vec<SessionState>,
70 pub last_activity: Option<UnixSeconds>,
71 pub created_at: UnixSeconds,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct PreparedSend {
76 pub recipient_owner: OwnerPubkey,
77 pub payload: Vec<u8>,
78 pub deliveries: Vec<Delivery>,
79 pub invite_responses: Vec<InviteResponseEnvelope>,
80 pub relay_gaps: Vec<RelayGap>,
81}
82
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct Delivery {
85 pub owner_pubkey: OwnerPubkey,
86 pub device_pubkey: DevicePubkey,
87 pub envelope: MessageEnvelope,
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct ProcessedInviteResponse {
92 pub owner_pubkey: OwnerPubkey,
93 pub device_pubkey: DevicePubkey,
94 pub claimed_owner_pubkey: Option<OwnerPubkey>,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct ReceivedMessage {
99 pub owner_pubkey: OwnerPubkey,
100 pub device_pubkey: DevicePubkey,
101 pub payload: Vec<u8>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
105pub enum RelayGap {
106 MissingRoster {
107 owner_pubkey: OwnerPubkey,
108 },
109 MissingDeviceInvite {
110 owner_pubkey: OwnerPubkey,
111 device_pubkey: DevicePubkey,
112 },
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
116pub struct PruneReport {
117 pub removed_devices: Vec<(OwnerPubkey, DevicePubkey)>,
118 pub removed_users: Vec<OwnerPubkey>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
122struct TargetDevice {
123 owner_pubkey: OwnerPubkey,
124 device_pubkey: DevicePubkey,
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128enum SendSessionSource {
129 Active,
130 Inactive(usize),
131}
132
133impl SessionManager {
134 pub fn new(local_owner_pubkey: OwnerPubkey, local_device_secret_key: [u8; 32]) -> Self {
135 let local_device_pubkey = crate::device_pubkey_from_secret_bytes(&local_device_secret_key)
136 .expect("local device secret key must derive a valid device public key");
137
138 Self {
139 local_owner_pubkey,
140 local_device_pubkey,
141 local_device_secret_key,
142 local_invite: None,
143 users: BTreeMap::new(),
144 }
145 }
146
147 pub fn from_snapshot(
148 snapshot: SessionManagerSnapshot,
149 local_device_secret_key: [u8; 32],
150 ) -> Result<Self> {
151 let derived_local_device_pubkey =
152 crate::device_pubkey_from_secret_bytes(&local_device_secret_key)?;
153 if derived_local_device_pubkey != snapshot.local_device_pubkey {
154 return Err(DomainError::InvalidState(
155 "snapshot local device pubkey does not match provided secret key".to_string(),
156 )
157 .into());
158 }
159
160 let users = snapshot
161 .users
162 .into_iter()
163 .map(UserRecord::from_snapshot)
164 .map(|record| (record.owner_pubkey, record))
165 .collect();
166
167 Ok(Self {
168 local_owner_pubkey: snapshot.local_owner_pubkey,
169 local_device_pubkey: snapshot.local_device_pubkey,
170 local_device_secret_key,
171 local_invite: snapshot.local_invite,
172 users,
173 })
174 }
175
176 pub fn snapshot(&self) -> SessionManagerSnapshot {
177 SessionManagerSnapshot {
178 local_owner_pubkey: self.local_owner_pubkey,
179 local_device_pubkey: self.local_device_pubkey,
180 local_invite: self.local_invite.clone(),
181 users: self.users.values().map(UserRecord::snapshot).collect(),
182 }
183 }
184
185 pub fn local_device_pubkey(&self) -> DevicePubkey {
186 self.local_device_pubkey
187 }
188
189 pub fn replace_local_invite(&mut self, invite: Invite) {
190 self.local_invite = Some(invite);
191 }
192
193 pub fn ensure_local_invite<R>(&mut self, ctx: &mut ProtocolContext<'_, R>) -> Result<&Invite>
194 where
195 R: RngCore + CryptoRng,
196 {
197 if self.local_invite.is_none() {
198 let invite = Invite::create_new_with_context(
199 ctx,
200 self.local_device_pubkey,
201 Some(self.local_owner_pubkey),
202 None,
203 )?;
204 self.observe_public_invite(self.local_owner_pubkey, invite.clone())?;
205 self.local_invite = Some(invite);
206 }
207
208 Ok(self.local_invite.as_ref().expect("local invite must exist"))
209 }
210
211 pub fn apply_local_roster(&mut self, roster: DeviceRoster) -> RosterSnapshotDecision {
212 self.apply_roster_for_owner(self.local_owner_pubkey, roster)
213 }
214
215 pub fn replace_local_roster(&mut self, roster: DeviceRoster) -> RosterSnapshotDecision {
216 self.apply_roster_for_owner_inner(self.local_owner_pubkey, roster, true)
217 }
218
219 pub fn observe_peer_roster(
220 &mut self,
221 owner_pubkey: OwnerPubkey,
222 roster: DeviceRoster,
223 ) -> RosterSnapshotDecision {
224 self.apply_roster_for_owner(owner_pubkey, roster)
225 }
226
227 pub fn observe_device_invite(
228 &mut self,
229 owner_pubkey: OwnerPubkey,
230 invite: Invite,
231 ) -> Result<()> {
232 self.observe_public_invite(owner_pubkey, invite)
233 }
234
235 pub fn observe_invite_response<R>(
236 &mut self,
237 ctx: &mut ProtocolContext<'_, R>,
238 envelope: &InviteResponseEnvelope,
239 ) -> Result<Option<ProcessedInviteResponse>>
240 where
241 R: RngCore + CryptoRng,
242 {
243 let Some(invite) = self.local_invite.clone() else {
244 return Ok(None);
245 };
246
247 let mut owned_invite = invite;
248 let InviteResponse {
249 session,
250 invitee_device_pubkey,
251 invitee_owner_pubkey,
252 ..
253 } = owned_invite.process_response(ctx, envelope, self.local_device_secret_key)?;
254
255 self.local_invite = Some(owned_invite);
256
257 let device_owner_pubkey = crate::owner_pubkey_from_device_pubkey(invitee_device_pubkey);
258 let invitee_owner_pubkey = invitee_owner_pubkey.ok_or_else(|| {
259 DomainError::InvalidState("invite response missing owner claim".to_string())
260 })?;
261 let claimed_owner_pubkey =
262 (invitee_owner_pubkey != device_owner_pubkey).then_some(invitee_owner_pubkey);
263 let owner_pubkey = claimed_owner_pubkey
264 .filter(|claimed_owner_pubkey| {
265 self.users
266 .get(claimed_owner_pubkey)
267 .and_then(|user| user.roster.as_ref())
268 .and_then(|roster| roster.get_device(&invitee_device_pubkey))
269 .is_some()
270 })
271 .unwrap_or(device_owner_pubkey);
272 let should_seed_single_device_roster = owner_pubkey == device_owner_pubkey
273 && self
274 .users
275 .get(&owner_pubkey)
276 .and_then(|user| user.roster.as_ref())
277 .is_none();
278 let user = self.user_record_mut(owner_pubkey);
279 if should_seed_single_device_roster {
280 user.roster = Some(DeviceRoster::new(
281 ctx.now,
282 vec![AuthorizedDevice::new(invitee_device_pubkey, ctx.now)],
283 ));
284 }
285 let record = user.device_record_mut(invitee_device_pubkey, ctx.now);
286 record.claimed_owner_pubkey = claimed_owner_pubkey
287 .filter(|claimed_owner_pubkey| *claimed_owner_pubkey != owner_pubkey);
288 if should_seed_single_device_roster {
289 record.authorized = true;
290 record.is_stale = false;
291 }
292 record.invite_response_generated = true;
293 record.upsert_session(session, ctx.now);
294
295 Ok(Some(ProcessedInviteResponse {
296 owner_pubkey,
297 device_pubkey: invitee_device_pubkey,
298 claimed_owner_pubkey: record.claimed_owner_pubkey,
299 }))
300 }
301
302 pub fn prepare_send<R>(
303 &mut self,
304 ctx: &mut ProtocolContext<'_, R>,
305 recipient_owner: OwnerPubkey,
306 payload: Vec<u8>,
307 ) -> Result<PreparedSend>
308 where
309 R: RngCore + CryptoRng,
310 {
311 self.prepare_send_inner(ctx, recipient_owner, payload, true)
312 }
313
314 pub fn prepare_remote_send<R>(
321 &mut self,
322 ctx: &mut ProtocolContext<'_, R>,
323 recipient_owner: OwnerPubkey,
324 payload: Vec<u8>,
325 ) -> Result<PreparedSend>
326 where
327 R: RngCore + CryptoRng,
328 {
329 self.prepare_send_inner(ctx, recipient_owner, payload, false)
330 }
331
332 pub fn prepare_remote_send_to_devices<R>(
333 &mut self,
334 ctx: &mut ProtocolContext<'_, R>,
335 recipient_owner: OwnerPubkey,
336 device_pubkeys: impl IntoIterator<Item = DevicePubkey>,
337 payload: Vec<u8>,
338 ) -> Result<PreparedSend>
339 where
340 R: RngCore + CryptoRng,
341 {
342 let targets = device_pubkeys
343 .into_iter()
344 .map(|device_pubkey| TargetDevice {
345 owner_pubkey: recipient_owner,
346 device_pubkey,
347 })
348 .collect();
349 self.prepare_explicit_send(ctx, recipient_owner, targets, payload, false)
350 }
351
352 pub fn prepare_local_sibling_send<R>(
353 &mut self,
354 ctx: &mut ProtocolContext<'_, R>,
355 payload: Vec<u8>,
356 ) -> Result<PreparedSend>
357 where
358 R: RngCore + CryptoRng,
359 {
360 self.prepare_local_sibling_send_inner(ctx, payload, false)
361 }
362
363 pub fn prepare_local_sibling_send_to_devices<R>(
364 &mut self,
365 ctx: &mut ProtocolContext<'_, R>,
366 device_pubkeys: impl IntoIterator<Item = DevicePubkey>,
367 payload: Vec<u8>,
368 ) -> Result<PreparedSend>
369 where
370 R: RngCore + CryptoRng,
371 {
372 let targets = device_pubkeys
373 .into_iter()
374 .map(|device_pubkey| TargetDevice {
375 owner_pubkey: self.local_owner_pubkey,
376 device_pubkey,
377 })
378 .collect();
379 self.prepare_explicit_send(ctx, self.local_owner_pubkey, targets, payload, false)
380 }
381
382 pub fn prepare_local_sibling_send_reusing_sessions<R>(
383 &mut self,
384 ctx: &mut ProtocolContext<'_, R>,
385 payload: Vec<u8>,
386 ) -> Result<PreparedSend>
387 where
388 R: RngCore + CryptoRng,
389 {
390 self.prepare_local_sibling_send_inner(ctx, payload, false)
391 }
392
393 pub fn prepare_local_sibling_send_refreshing_one_way_sessions<R>(
394 &mut self,
395 ctx: &mut ProtocolContext<'_, R>,
396 payload: Vec<u8>,
397 ) -> Result<PreparedSend>
398 where
399 R: RngCore + CryptoRng,
400 {
401 self.prepare_local_sibling_send_inner(ctx, payload, true)
402 }
403
404 pub fn prepare_local_sibling_send_reusing_all_sessions<R>(
405 &mut self,
406 ctx: &mut ProtocolContext<'_, R>,
407 payload: Vec<u8>,
408 ) -> Result<PreparedSend>
409 where
410 R: RngCore + CryptoRng,
411 {
412 let mut targets = BTreeSet::new();
413 self.collect_local_sibling_targets(&mut targets);
414
415 let mut deliveries = Vec::new();
416 let mut invite_responses = Vec::new();
417 let mut relay_gaps = Vec::new();
418
419 for target in targets {
420 let mut target_deliveries = self.prepare_device_deliveries_for_all_send_sessions(
421 ctx,
422 target.owner_pubkey,
423 target.device_pubkey,
424 &payload,
425 )?;
426 if target_deliveries.is_empty() {
427 match self.prepare_device_delivery(
428 ctx,
429 target.owner_pubkey,
430 target.device_pubkey,
431 &payload,
432 false,
433 )? {
434 Some((delivery, maybe_response)) => {
435 target_deliveries.push(delivery);
436 if let Some(response) = maybe_response {
437 invite_responses.push(response);
438 }
439 }
440 None => {
441 relay_gaps.push(RelayGap::MissingDeviceInvite {
442 owner_pubkey: target.owner_pubkey,
443 device_pubkey: target.device_pubkey,
444 });
445 }
446 }
447 }
448 deliveries.extend(target_deliveries);
449 }
450
451 relay_gaps.sort();
452
453 Ok(PreparedSend {
454 recipient_owner: self.local_owner_pubkey,
455 payload,
456 deliveries,
457 invite_responses,
458 relay_gaps,
459 })
460 }
461
462 fn prepare_local_sibling_send_inner<R>(
463 &mut self,
464 ctx: &mut ProtocolContext<'_, R>,
465 payload: Vec<u8>,
466 refresh_one_way_bootstrap: bool,
467 ) -> Result<PreparedSend>
468 where
469 R: RngCore + CryptoRng,
470 {
471 let mut targets = BTreeSet::new();
472 self.collect_local_sibling_targets(&mut targets);
473
474 let mut deliveries = Vec::new();
475 let mut invite_responses = Vec::new();
476 let mut relay_gaps = Vec::new();
477
478 for target in targets {
479 match self.prepare_device_delivery(
480 ctx,
481 target.owner_pubkey,
482 target.device_pubkey,
483 &payload,
484 refresh_one_way_bootstrap,
485 )? {
486 Some((delivery, maybe_response)) => {
487 deliveries.push(delivery);
488 if let Some(response) = maybe_response {
489 invite_responses.push(response);
490 }
491 }
492 None => {
493 relay_gaps.push(RelayGap::MissingDeviceInvite {
494 owner_pubkey: target.owner_pubkey,
495 device_pubkey: target.device_pubkey,
496 });
497 }
498 }
499 }
500
501 relay_gaps.sort();
502
503 Ok(PreparedSend {
504 recipient_owner: self.local_owner_pubkey,
505 payload,
506 deliveries,
507 invite_responses,
508 relay_gaps,
509 })
510 }
511
512 pub(crate) fn has_authorized_local_siblings(&self) -> bool {
513 let Some(user) = self.users.get(&self.local_owner_pubkey) else {
514 return false;
515 };
516 if user.roster.is_none() {
517 return false;
518 }
519 user.authorized_non_stale_devices()
520 .into_iter()
521 .any(|device_pubkey| device_pubkey != self.local_device_pubkey)
522 }
523
524 pub fn receive<R>(
525 &mut self,
526 ctx: &mut ProtocolContext<'_, R>,
527 sender_owner: OwnerPubkey,
528 envelope: &MessageEnvelope,
529 ) -> Result<Option<ReceivedMessage>>
530 where
531 R: RngCore + CryptoRng,
532 {
533 let Some(user) = self.users.get_mut(&sender_owner) else {
534 return Ok(None);
535 };
536
537 let device_pubkeys: Vec<DevicePubkey> = user.devices.keys().copied().collect();
538 for device_pubkey in device_pubkeys {
539 let record = user
540 .devices
541 .get_mut(&device_pubkey)
542 .expect("device key collected from map");
543
544 if let Some(active_session) = record.active_session.as_ref() {
545 if active_session.matches_sender(envelope.sender) {
546 let plan = active_session.plan_receive(ctx, envelope)?;
547 let outcome = record
548 .active_session
549 .as_mut()
550 .expect("active session must still exist")
551 .apply_receive(plan);
552 record.last_activity = Some(ctx.now);
553 return Ok(Some(ReceivedMessage {
554 owner_pubkey: sender_owner,
555 device_pubkey,
556 payload: outcome.payload,
557 }));
558 }
559 }
560
561 let mut matched_inactive = None;
562 for (index, session) in record.inactive_sessions.iter().enumerate() {
563 if !session.matches_sender(envelope.sender) {
564 continue;
565 }
566 let plan = session.plan_receive(ctx, envelope)?;
567 matched_inactive = Some((index, plan));
568 break;
569 }
570
571 if let Some((index, plan)) = matched_inactive {
572 let mut session = record.inactive_sessions.remove(index);
573 let outcome = session.apply_receive(plan);
574 record.promote_inactive_session(session);
575 record.last_activity = Some(ctx.now);
576 return Ok(Some(ReceivedMessage {
577 owner_pubkey: sender_owner,
578 device_pubkey,
579 payload: outcome.payload,
580 }));
581 }
582 }
583
584 Ok(None)
585 }
586
587 pub fn prune_stale(&mut self, _now: UnixSeconds) -> PruneReport {
588 let mut removed_devices = Vec::new();
589 let mut removed_users = Vec::new();
590
591 self.users.retain(|owner_pubkey, user| {
592 user.devices.retain(|device_pubkey, record| {
593 let keep = !record.is_stale;
594 if !keep {
595 removed_devices.push((*owner_pubkey, *device_pubkey));
596 }
597 keep
598 });
599
600 let keep_user = !user.devices.is_empty() || user.roster.is_some();
601 if !keep_user {
602 removed_users.push(*owner_pubkey);
603 }
604 keep_user
605 });
606
607 removed_devices.sort();
608 removed_users.sort();
609
610 PruneReport {
611 removed_devices,
612 removed_users,
613 }
614 }
615
616 pub fn delete_user(&mut self, owner_pubkey: OwnerPubkey) {
617 if owner_pubkey != self.local_owner_pubkey {
618 self.users.remove(&owner_pubkey);
619 }
620 }
621
622 pub fn import_session_state(
623 &mut self,
624 owner_pubkey: OwnerPubkey,
625 device_pubkey: DevicePubkey,
626 state: SessionState,
627 now: UnixSeconds,
628 ) {
629 let user = self.user_record_mut(owner_pubkey);
630 let record = user.device_record_mut(device_pubkey, now);
631 record.authorized = true;
632 record.is_stale = false;
633 record.invite_response_generated = true;
634 record.upsert_session(Session::from_state(state), now);
635 }
636
637 fn prepare_device_delivery<R>(
638 &mut self,
639 ctx: &mut ProtocolContext<'_, R>,
640 owner_pubkey: OwnerPubkey,
641 device_pubkey: DevicePubkey,
642 payload: &[u8],
643 refresh_one_way_bootstrap: bool,
644 ) -> Result<Option<(Delivery, Option<InviteResponseEnvelope>)>>
645 where
646 R: RngCore + CryptoRng,
647 {
648 let claimed_owner = Some(self.local_owner_pubkey);
649 let local_owner_pubkey = self.local_owner_pubkey;
650 let local_device_pubkey = self.local_device_pubkey;
651 let local_device_secret_key = self.local_device_secret_key;
652 let user = self.user_record_mut(owner_pubkey);
653 let record = user.device_record_mut(device_pubkey, ctx.now);
654
655 if !record.authorized || record.is_stale {
656 return Ok(None);
657 }
658
659 let source = record.best_send_session_source();
660 let should_refresh_local_sibling_bootstrap = refresh_one_way_bootstrap
661 && owner_pubkey == local_owner_pubkey
662 && device_pubkey != local_device_pubkey
663 && record.public_invite.is_some()
664 && source
665 .as_ref()
666 .and_then(|source| record.session_for_send_source(source))
667 .is_some_and(is_one_way_bootstrap_session);
668
669 if should_refresh_local_sibling_bootstrap {
670 let public_invite = record
671 .public_invite
672 .clone()
673 .expect("checked public invite presence");
674 match public_invite.accept_with_owner_context(
675 ctx,
676 local_device_pubkey,
677 local_device_secret_key,
678 claimed_owner,
679 ) {
680 Ok((mut session, invite_response)) => {
681 let envelope = session
682 .apply_send(session.plan_send(payload, ctx.now)?)
683 .envelope;
684 record.invite_response_generated = true;
685 record.upsert_session(session, ctx.now);
686
687 return Ok(Some((
688 Delivery {
689 owner_pubkey,
690 device_pubkey,
691 envelope,
692 },
693 Some(invite_response),
694 )));
695 }
696 Err(Error::Domain(
697 DomainError::InviteAlreadyUsed | DomainError::InviteExhausted,
698 )) => {}
699 Err(error) => return Err(error),
700 }
701 }
702
703 if let Some(source) = source {
704 let plan = match source {
705 SendSessionSource::Active => record
706 .active_session
707 .as_ref()
708 .expect("active session must exist")
709 .plan_send(payload, ctx.now)?,
710 SendSessionSource::Inactive(index) => {
711 record.inactive_sessions[index].plan_send(payload, ctx.now)?
712 }
713 };
714
715 let envelope = match source {
716 SendSessionSource::Active => {
717 record
718 .active_session
719 .as_mut()
720 .expect("active session must exist")
721 .apply_send(plan)
722 .envelope
723 }
724 SendSessionSource::Inactive(index) => {
725 let mut session = record.inactive_sessions.remove(index);
726 let outcome = session.apply_send(plan);
727 record.upsert_session(session, ctx.now);
728 outcome.envelope
729 }
730 };
731
732 record.last_activity = Some(ctx.now);
733 return Ok(Some((
734 Delivery {
735 owner_pubkey,
736 device_pubkey,
737 envelope,
738 },
739 None,
740 )));
741 }
742
743 let Some(public_invite) = record.public_invite.clone() else {
744 return Ok(None);
745 };
746
747 let (mut session, invite_response) = match public_invite.accept_with_owner_context(
748 ctx,
749 local_device_pubkey,
750 local_device_secret_key,
751 claimed_owner,
752 ) {
753 Ok(result) => result,
754 Err(Error::Domain(DomainError::InviteAlreadyUsed | DomainError::InviteExhausted)) => {
755 return Ok(None)
756 }
757 Err(error) => return Err(error),
758 };
759 let envelope = session
760 .apply_send(session.plan_send(payload, ctx.now)?)
761 .envelope;
762 record.invite_response_generated = true;
763 record.upsert_session(session, ctx.now);
764
765 Ok(Some((
766 Delivery {
767 owner_pubkey,
768 device_pubkey,
769 envelope,
770 },
771 Some(invite_response),
772 )))
773 }
774
775 fn prepare_device_deliveries_for_all_send_sessions<R>(
776 &mut self,
777 ctx: &mut ProtocolContext<'_, R>,
778 owner_pubkey: OwnerPubkey,
779 device_pubkey: DevicePubkey,
780 payload: &[u8],
781 ) -> Result<Vec<Delivery>>
782 where
783 R: RngCore + CryptoRng,
784 {
785 let user = self.user_record_mut(owner_pubkey);
786 let record = user.device_record_mut(device_pubkey, ctx.now);
787
788 if !record.authorized || record.is_stale {
789 return Ok(Vec::new());
790 }
791
792 let mut deliveries = Vec::new();
793
794 if let Some(active_session) = record.active_session.as_mut() {
795 if active_session.can_send() {
796 let plan = active_session.plan_send(payload, ctx.now)?;
797 let envelope = active_session.apply_send(plan).envelope;
798 deliveries.push(Delivery {
799 owner_pubkey,
800 device_pubkey,
801 envelope,
802 });
803 }
804 }
805
806 let inactive_sessions = std::mem::take(&mut record.inactive_sessions);
807 for mut session in inactive_sessions {
808 if session.can_send() {
809 let plan = session.plan_send(payload, ctx.now)?;
810 let envelope = session.apply_send(plan).envelope;
811 deliveries.push(Delivery {
812 owner_pubkey,
813 device_pubkey,
814 envelope,
815 });
816 }
817 record.upsert_session(session, ctx.now);
818 }
819
820 if !deliveries.is_empty() {
821 record.last_activity = Some(ctx.now);
822 }
823
824 Ok(deliveries)
825 }
826
827 fn prepare_send_inner<R>(
828 &mut self,
829 ctx: &mut ProtocolContext<'_, R>,
830 recipient_owner: OwnerPubkey,
831 payload: Vec<u8>,
832 include_local_siblings: bool,
833 ) -> Result<PreparedSend>
834 where
835 R: RngCore + CryptoRng,
836 {
837 let mut relay_gaps = Vec::new();
838 let mut targets = BTreeSet::new();
839
840 self.collect_recipient_targets(recipient_owner, &mut targets, &mut relay_gaps);
841 if include_local_siblings {
842 self.collect_local_sibling_targets(&mut targets);
843 }
844
845 let mut deliveries = Vec::new();
846 let mut invite_responses = Vec::new();
847
848 for target in targets {
849 match self.prepare_device_delivery(
850 ctx,
851 target.owner_pubkey,
852 target.device_pubkey,
853 &payload,
854 false,
855 )? {
856 Some((delivery, maybe_response)) => {
857 deliveries.push(delivery);
858 if let Some(response) = maybe_response {
859 invite_responses.push(response);
860 }
861 }
862 None => {
863 relay_gaps.push(RelayGap::MissingDeviceInvite {
864 owner_pubkey: target.owner_pubkey,
865 device_pubkey: target.device_pubkey,
866 });
867 }
868 }
869 }
870
871 relay_gaps.sort();
872
873 Ok(PreparedSend {
874 recipient_owner,
875 payload,
876 deliveries,
877 invite_responses,
878 relay_gaps,
879 })
880 }
881
882 fn prepare_explicit_send<R>(
883 &mut self,
884 ctx: &mut ProtocolContext<'_, R>,
885 recipient_owner: OwnerPubkey,
886 targets: BTreeSet<TargetDevice>,
887 payload: Vec<u8>,
888 refresh_one_way_bootstrap: bool,
889 ) -> Result<PreparedSend>
890 where
891 R: RngCore + CryptoRng,
892 {
893 let mut deliveries = Vec::new();
894 let mut invite_responses = Vec::new();
895 let mut relay_gaps = Vec::new();
896
897 for target in targets {
898 match self.prepare_device_delivery(
899 ctx,
900 target.owner_pubkey,
901 target.device_pubkey,
902 &payload,
903 refresh_one_way_bootstrap,
904 )? {
905 Some((delivery, maybe_response)) => {
906 deliveries.push(delivery);
907 if let Some(response) = maybe_response {
908 invite_responses.push(response);
909 }
910 }
911 None => {
912 relay_gaps.push(RelayGap::MissingDeviceInvite {
913 owner_pubkey: target.owner_pubkey,
914 device_pubkey: target.device_pubkey,
915 });
916 }
917 }
918 }
919
920 relay_gaps.sort();
921
922 Ok(PreparedSend {
923 recipient_owner,
924 payload,
925 deliveries,
926 invite_responses,
927 relay_gaps,
928 })
929 }
930
931 fn collect_recipient_targets(
932 &self,
933 recipient_owner: OwnerPubkey,
934 targets: &mut BTreeSet<TargetDevice>,
935 relay_gaps: &mut Vec<RelayGap>,
936 ) {
937 let Some(user) = self.users.get(&recipient_owner) else {
938 relay_gaps.push(RelayGap::MissingRoster {
939 owner_pubkey: recipient_owner,
940 });
941 return;
942 };
943
944 if user
945 .roster
946 .as_ref()
947 .is_none_or(|roster| roster.devices().is_empty())
948 {
949 relay_gaps.push(RelayGap::MissingRoster {
950 owner_pubkey: recipient_owner,
951 });
952 return;
953 }
954
955 for device_pubkey in user.authorized_non_stale_devices() {
956 targets.insert(TargetDevice {
957 owner_pubkey: recipient_owner,
958 device_pubkey,
959 });
960 }
961 }
962
963 fn collect_local_sibling_targets(&self, targets: &mut BTreeSet<TargetDevice>) {
964 let Some(user) = self.users.get(&self.local_owner_pubkey) else {
965 return;
966 };
967
968 if user.roster.is_none() {
969 return;
970 }
971
972 for device_pubkey in user.authorized_non_stale_devices() {
973 if device_pubkey == self.local_device_pubkey {
974 continue;
975 }
976 targets.insert(TargetDevice {
977 owner_pubkey: self.local_owner_pubkey,
978 device_pubkey,
979 });
980 }
981 }
982
983 fn observe_public_invite(&mut self, owner_pubkey: OwnerPubkey, invite: Invite) -> Result<()> {
984 if let Some(inviter_owner_pubkey) = invite.inviter_owner_pubkey {
985 if inviter_owner_pubkey != owner_pubkey {
986 return Err(DomainError::InvalidState(format!(
987 "invite owner mismatch: expected {owner_pubkey}, got {inviter_owner_pubkey}"
988 ))
989 .into());
990 }
991 }
992
993 let device_pubkey = invite.inviter_device_pubkey;
994 let mut public_invite = invite;
995 public_invite.inviter_ephemeral_private_key = None;
996
997 let user = self.user_record_mut(owner_pubkey);
998 let record = user.device_record_mut(device_pubkey, public_invite.created_at);
999
1000 let should_replace_invite = record
1001 .public_invite
1002 .as_ref()
1003 .is_none_or(|existing| public_invite.created_at >= existing.created_at);
1004
1005 record.created_at = merge_created_at(record.created_at, public_invite.created_at);
1006 if should_replace_invite {
1007 record.public_invite = Some(public_invite);
1008 }
1009 Ok(())
1010 }
1011
1012 fn apply_roster_for_owner(
1013 &mut self,
1014 owner_pubkey: OwnerPubkey,
1015 incoming_roster: DeviceRoster,
1016 ) -> RosterSnapshotDecision {
1017 self.apply_roster_for_owner_inner(owner_pubkey, incoming_roster, false)
1018 }
1019
1020 fn apply_roster_for_owner_inner(
1021 &mut self,
1022 owner_pubkey: OwnerPubkey,
1023 incoming_roster: DeviceRoster,
1024 replace_existing: bool,
1025 ) -> RosterSnapshotDecision {
1026 let user = self.user_record_mut(owner_pubkey);
1027 let current_roster = user.roster.as_ref();
1028 let (decision, next_roster) = if replace_existing {
1029 (RosterSnapshotDecision::Advanced, incoming_roster)
1030 } else {
1031 apply_roster_snapshot(current_roster, &incoming_roster)
1032 };
1033
1034 let previous_authorized = current_roster
1035 .map(authorized_device_set)
1036 .unwrap_or_default();
1037 let next_authorized = authorized_device_set(&next_roster);
1038
1039 user.roster = Some(next_roster.clone());
1040
1041 for device in next_roster.devices() {
1042 let record = user.device_record_mut(device.device_pubkey, device.created_at);
1043 record.authorized = true;
1044 record.is_stale = false;
1045 record.stale_since = None;
1046 record.created_at = merge_created_at(record.created_at, device.created_at);
1047 }
1048
1049 for removed in previous_authorized.difference(&next_authorized) {
1050 let record = user.device_record_mut(*removed, next_roster.created_at);
1051 record.authorized = false;
1052 record.is_stale = true;
1053 if record.stale_since.is_none() {
1054 record.stale_since = Some(next_roster.created_at);
1055 }
1056 }
1057
1058 self.reconcile_verified_claimed_devices(owner_pubkey, &next_roster, next_roster.created_at);
1059
1060 decision
1061 }
1062
1063 fn reconcile_verified_claimed_devices(
1064 &mut self,
1065 owner_pubkey: OwnerPubkey,
1066 roster: &DeviceRoster,
1067 now: UnixSeconds,
1068 ) {
1069 let roster_devices = authorized_device_set(roster);
1070 if roster_devices.is_empty() {
1071 return;
1072 }
1073
1074 let source_owners: Vec<OwnerPubkey> = self
1075 .users
1076 .keys()
1077 .copied()
1078 .filter(|candidate_owner_pubkey| *candidate_owner_pubkey != owner_pubkey)
1079 .collect();
1080
1081 let mut migrated = Vec::new();
1082 let mut empty_sources = Vec::new();
1083
1084 for source_owner_pubkey in source_owners {
1085 let matching_devices = self
1086 .users
1087 .get(&source_owner_pubkey)
1088 .map(|user| {
1089 user.devices
1090 .values()
1091 .filter(|record| {
1092 if !roster_devices.contains(&record.device_pubkey) {
1093 return false;
1094 }
1095 if record.claimed_owner_pubkey == Some(owner_pubkey) {
1096 return true;
1097 }
1098 user.roster
1099 .as_ref()
1100 .and_then(|roster| roster.get_device(&record.device_pubkey))
1101 .is_none()
1102 })
1103 .map(|record| record.device_pubkey)
1104 .collect::<Vec<_>>()
1105 })
1106 .unwrap_or_default();
1107
1108 if matching_devices.is_empty() {
1109 continue;
1110 }
1111
1112 if let Some(user) = self.users.get_mut(&source_owner_pubkey) {
1113 let source_roster_is_provisional = user.roster.as_ref().is_some_and(|roster| {
1114 roster.devices().iter().all(|device| {
1115 matching_devices.contains(&device.device_pubkey)
1116 && crate::owner_pubkey_from_device_pubkey(device.device_pubkey)
1117 == source_owner_pubkey
1118 })
1119 });
1120
1121 for device_pubkey in matching_devices {
1122 if let Some(mut record) = user.devices.remove(&device_pubkey) {
1123 record.claimed_owner_pubkey = None;
1124 migrated.push(record);
1125 }
1126 }
1127
1128 if user.devices.is_empty()
1129 && (user.roster.is_none() || source_roster_is_provisional)
1130 {
1131 empty_sources.push(source_owner_pubkey);
1132 }
1133 }
1134 }
1135
1136 for source_owner_pubkey in empty_sources {
1137 self.users.remove(&source_owner_pubkey);
1138 }
1139
1140 if migrated.is_empty() {
1141 return;
1142 }
1143
1144 let user = self.user_record_mut(owner_pubkey);
1145 for record in migrated {
1146 let device_pubkey = record.device_pubkey;
1147 user.device_record_mut(device_pubkey, record.created_at)
1148 .absorb(record, now);
1149 }
1150 }
1151
1152 fn user_record_mut(&mut self, owner_pubkey: OwnerPubkey) -> &mut UserRecord {
1153 self.users
1154 .entry(owner_pubkey)
1155 .or_insert_with(|| UserRecord::new(owner_pubkey))
1156 }
1157}
1158
1159impl UserRecord {
1160 fn new(owner_pubkey: OwnerPubkey) -> Self {
1161 Self {
1162 owner_pubkey,
1163 roster: None,
1164 devices: BTreeMap::new(),
1165 }
1166 }
1167
1168 fn from_snapshot(snapshot: UserRecordSnapshot) -> Self {
1169 Self {
1170 owner_pubkey: snapshot.owner_pubkey,
1171 roster: snapshot.roster,
1172 devices: snapshot
1173 .devices
1174 .into_iter()
1175 .map(DeviceRecord::from_snapshot)
1176 .map(|record| (record.device_pubkey, record))
1177 .collect(),
1178 }
1179 }
1180
1181 fn snapshot(&self) -> UserRecordSnapshot {
1182 UserRecordSnapshot {
1183 owner_pubkey: self.owner_pubkey,
1184 roster: self.roster.clone(),
1185 devices: self.devices.values().map(DeviceRecord::snapshot).collect(),
1186 }
1187 }
1188
1189 fn device_record_mut(
1190 &mut self,
1191 device_pubkey: DevicePubkey,
1192 created_at: UnixSeconds,
1193 ) -> &mut DeviceRecord {
1194 self.devices
1195 .entry(device_pubkey)
1196 .or_insert_with(|| DeviceRecord::new(device_pubkey, created_at))
1197 }
1198
1199 fn authorized_non_stale_devices(&self) -> Vec<DevicePubkey> {
1200 self.devices
1201 .values()
1202 .filter(|record| record.authorized && !record.is_stale)
1203 .map(|record| record.device_pubkey)
1204 .collect()
1205 }
1206}
1207
1208impl DeviceRecord {
1209 fn new(device_pubkey: DevicePubkey, created_at: UnixSeconds) -> Self {
1210 Self {
1211 device_pubkey,
1212 authorized: false,
1213 is_stale: false,
1214 stale_since: None,
1215 claimed_owner_pubkey: None,
1216 public_invite: None,
1217 invite_response_generated: false,
1218 active_session: None,
1219 inactive_sessions: Vec::new(),
1220 last_activity: None,
1221 created_at,
1222 }
1223 }
1224
1225 fn from_snapshot(snapshot: DeviceRecordSnapshot) -> Self {
1226 Self {
1227 device_pubkey: snapshot.device_pubkey,
1228 authorized: snapshot.authorized,
1229 is_stale: snapshot.is_stale,
1230 stale_since: snapshot.stale_since,
1231 claimed_owner_pubkey: snapshot.claimed_owner_pubkey,
1232 public_invite: snapshot.public_invite,
1233 invite_response_generated: snapshot.invite_response_generated,
1234 active_session: snapshot.active_session.map(Session::from_state),
1235 inactive_sessions: snapshot
1236 .inactive_sessions
1237 .into_iter()
1238 .map(Session::from_state)
1239 .collect(),
1240 last_activity: snapshot.last_activity,
1241 created_at: snapshot.created_at,
1242 }
1243 }
1244
1245 fn snapshot(&self) -> DeviceRecordSnapshot {
1246 DeviceRecordSnapshot {
1247 device_pubkey: self.device_pubkey,
1248 authorized: self.authorized,
1249 is_stale: self.is_stale,
1250 stale_since: self.stale_since,
1251 claimed_owner_pubkey: self.claimed_owner_pubkey,
1252 public_invite: self.public_invite.clone(),
1253 invite_response_generated: self.invite_response_generated,
1254 active_session: self
1255 .active_session
1256 .as_ref()
1257 .map(|session| session.state.clone()),
1258 inactive_sessions: self
1259 .inactive_sessions
1260 .iter()
1261 .map(|session| session.state.clone())
1262 .collect(),
1263 last_activity: self.last_activity,
1264 created_at: self.created_at,
1265 }
1266 }
1267
1268 fn best_send_session_source(&self) -> Option<SendSessionSource> {
1269 let mut best: Option<(SendSessionSource, (u8, u32, u32))> = None;
1270
1271 if let Some(active_session) = self.active_session.as_ref() {
1272 if active_session.can_send() {
1273 best = Some((SendSessionSource::Active, session_priority(active_session)));
1274 }
1275 }
1276
1277 for (index, session) in self.inactive_sessions.iter().enumerate() {
1278 if !session.can_send() {
1279 continue;
1280 }
1281 let priority = session_priority(session);
1282 if best
1283 .as_ref()
1284 .is_none_or(|(_, current_priority)| priority > *current_priority)
1285 {
1286 best = Some((SendSessionSource::Inactive(index), priority));
1287 }
1288 }
1289
1290 best.map(|(source, _)| source)
1291 }
1292
1293 fn session_for_send_source(&self, source: &SendSessionSource) -> Option<&Session> {
1294 match source {
1295 SendSessionSource::Active => self.active_session.as_ref(),
1296 SendSessionSource::Inactive(index) => self.inactive_sessions.get(*index),
1297 }
1298 }
1299
1300 fn upsert_session(&mut self, session: Session, now: UnixSeconds) {
1301 if self.contains_state(&session.state) {
1302 self.compact_duplicate_sessions();
1303 self.last_activity = Some(now);
1304 return;
1305 }
1306
1307 let new_priority = session_priority(&session);
1308 let old_priority = self
1309 .active_session
1310 .as_ref()
1311 .map(session_priority)
1312 .unwrap_or((0, 0, 0));
1313
1314 if let Some(old_active) = self.active_session.take() {
1315 if old_priority >= new_priority {
1316 self.inactive_sessions.push(session);
1317 self.active_session = Some(old_active);
1318 } else {
1319 self.inactive_sessions.push(old_active);
1320 self.active_session = Some(session);
1321 }
1322 } else {
1323 self.active_session = Some(session);
1324 }
1325
1326 self.compact_duplicate_sessions();
1327 if self.inactive_sessions.len() > MAX_INACTIVE_SESSIONS {
1328 self.inactive_sessions.truncate(MAX_INACTIVE_SESSIONS);
1329 }
1330 self.last_activity = Some(now);
1331 }
1332
1333 fn absorb(&mut self, mut other: DeviceRecord, now: UnixSeconds) {
1334 self.authorized |= other.authorized;
1335 self.is_stale &= other.is_stale;
1336 self.stale_since = match (self.stale_since, other.stale_since) {
1337 (Some(existing), Some(incoming)) => Some(existing.min(incoming)),
1338 (None, incoming) => incoming,
1339 (existing, None) => existing,
1340 };
1341 self.claimed_owner_pubkey = self
1342 .claimed_owner_pubkey
1343 .or(other.claimed_owner_pubkey.take());
1344 self.created_at = merge_created_at(self.created_at, other.created_at);
1345
1346 if let Some(public_invite) = other.public_invite.take() {
1347 let should_replace_invite = self
1348 .public_invite
1349 .as_ref()
1350 .is_none_or(|existing| public_invite.created_at >= existing.created_at);
1351 if should_replace_invite {
1352 self.public_invite = Some(public_invite);
1353 }
1354 }
1355 self.invite_response_generated |= other.invite_response_generated;
1356
1357 if let Some(session) = other.active_session.take() {
1358 self.upsert_session(session, now);
1359 }
1360
1361 for session in other.inactive_sessions.drain(..) {
1362 self.upsert_session(session, now);
1363 }
1364
1365 self.last_activity = match (self.last_activity, other.last_activity) {
1366 (Some(existing), Some(incoming)) => Some(existing.max(incoming)),
1367 (None, incoming) => incoming,
1368 (existing, None) => existing,
1369 };
1370 }
1371
1372 fn promote_inactive_session(&mut self, session: Session) {
1373 let new_priority = session_priority(&session);
1374 if let Some(old_active) = self.active_session.take() {
1375 let old_priority = session_priority(&old_active);
1376 if new_priority > old_priority {
1377 if old_active.state != session.state {
1378 self.inactive_sessions.push(old_active);
1379 }
1380 self.active_session = Some(session);
1381 } else {
1382 self.inactive_sessions.push(session);
1383 self.active_session = Some(old_active);
1384 }
1385 } else {
1386 self.active_session = Some(session);
1387 }
1388 self.compact_duplicate_sessions();
1389 if self.inactive_sessions.len() > MAX_INACTIVE_SESSIONS {
1390 self.inactive_sessions.truncate(MAX_INACTIVE_SESSIONS);
1391 }
1392 }
1393
1394 fn contains_state(&self, state: &SessionState) -> bool {
1395 self.active_session
1396 .as_ref()
1397 .is_some_and(|session| session.state == *state)
1398 || self
1399 .inactive_sessions
1400 .iter()
1401 .any(|session| session.state == *state)
1402 }
1403
1404 fn compact_duplicate_sessions(&mut self) {
1405 let active_state = self
1406 .active_session
1407 .as_ref()
1408 .map(|session| session.state.clone());
1409 let mut unique_states = Vec::new();
1410 let mut inactive_sessions = Vec::with_capacity(self.inactive_sessions.len());
1411
1412 for session in self.inactive_sessions.drain(..) {
1413 let is_duplicate = active_state
1414 .as_ref()
1415 .is_some_and(|state| *state == session.state)
1416 || unique_states.contains(&session.state);
1417 if is_duplicate {
1418 continue;
1419 }
1420 unique_states.push(session.state.clone());
1421 inactive_sessions.push(session);
1422 }
1423
1424 self.inactive_sessions = inactive_sessions;
1425 }
1426}
1427
1428fn apply_roster_snapshot(
1429 current_roster: Option<&DeviceRoster>,
1430 incoming_roster: &DeviceRoster,
1431) -> (RosterSnapshotDecision, DeviceRoster) {
1432 let Some(current_roster) = current_roster else {
1433 return (RosterSnapshotDecision::Advanced, incoming_roster.clone());
1434 };
1435
1436 if incoming_roster.created_at > current_roster.created_at {
1437 return (RosterSnapshotDecision::Advanced, incoming_roster.clone());
1438 }
1439
1440 if incoming_roster.created_at < current_roster.created_at {
1441 return (RosterSnapshotDecision::Stale, current_roster.clone());
1442 }
1443
1444 (
1445 RosterSnapshotDecision::MergedEqualTimestamp,
1446 current_roster.merge(incoming_roster),
1447 )
1448}
1449
1450fn authorized_device_set(roster: &DeviceRoster) -> BTreeSet<DevicePubkey> {
1451 roster
1452 .devices()
1453 .iter()
1454 .map(|device| device.device_pubkey)
1455 .collect()
1456}
1457
1458fn session_priority(session: &Session) -> (u8, u32, u32) {
1459 let can_send = session.can_send();
1460 let can_receive = session.state.receiving_chain_key.is_some()
1461 || session.state.their_current_nostr_public_key.is_some()
1462 || session.state.receiving_chain_message_number > 0;
1463
1464 let directionality = match (can_send, can_receive) {
1465 (true, true) => 3,
1466 (true, false) => 2,
1467 (false, true) => 1,
1468 (false, false) => 0,
1469 };
1470
1471 (
1472 directionality,
1473 session.state.receiving_chain_message_number,
1474 session.state.sending_chain_message_number,
1475 )
1476}
1477
1478fn is_one_way_bootstrap_session(session: &Session) -> bool {
1479 session.state.receiving_chain_key.is_none()
1480 && session.state.their_current_nostr_public_key.is_none()
1481}
1482
1483fn merge_created_at(current: UnixSeconds, observed: UnixSeconds) -> UnixSeconds {
1484 match (current.get(), observed.get()) {
1485 (0, _) => observed,
1486 (_, 0) => current,
1487 _ => current.min(observed),
1488 }
1489}