1#![forbid(unsafe_code)]
11
12pub mod bridge_auth;
13pub mod bridge_handlers;
14pub mod dev_api;
15pub(crate) mod error;
16pub mod http;
17pub mod projection;
18pub mod tls;
19mod well_known;
20
21use std::collections::HashMap;
22use std::marker::PhantomData;
23use std::net::SocketAddr;
24use std::sync::Arc;
25use std::time::Duration;
26
27use scp_core::store::{CURRENT_STORE_VERSION, ProtocolStore, StoredValue};
28use scp_identity::document::DidDocument;
29use scp_identity::{DidMethod, IdentityError, ScpIdentity};
30use scp_platform::EncryptedStorage;
31use scp_platform::traits::{KeyCustody, Storage};
32use scp_transport::nat::{NatTierChange, NetworkChangeDetector};
33use scp_transport::native::server::{RelayConfig, RelayError, RelayServer, ShutdownHandle};
34use scp_transport::native::storage::BlobStorageBackend;
35use tokio_util::sync::CancellationToken;
36use zeroize::Zeroizing;
37
38pub use http::BroadcastContext;
39pub use projection::ProjectedContext;
40
41pub const DEFAULT_HTTP_BIND_ADDR: SocketAddr =
58 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 8443);
59
60pub(crate) const MAX_BROADCAST_CONTEXTS: usize = 1024;
70
71pub const DEFAULT_PROJECTION_RATE_LIMIT: u32 = 60;
78
79#[derive(Debug, thiserror::Error)]
85pub enum NodeError {
86 #[error("missing required field: {0}")]
88 MissingField(&'static str),
89
90 #[error("identity error: {0}")]
92 Identity(#[from] IdentityError),
93
94 #[error("relay error: {0}")]
96 Relay(#[from] RelayError),
97
98 #[error("storage error: {0}")]
100 Storage(String),
101
102 #[error("invalid configuration: {0}")]
104 InvalidConfig(String),
105
106 #[error("serve error: {0}")]
108 Serve(String),
109
110 #[error("NAT traversal error: {0}")]
112 Nat(String),
113
114 #[error("TLS error: {0}")]
116 Tls(#[from] tls::TlsError),
117}
118
119#[derive(Debug)]
129pub struct RelayHandle {
130 bound_addr: SocketAddr,
132 shutdown_handle: ShutdownHandle,
134}
135
136impl RelayHandle {
137 #[must_use]
139 pub const fn bound_addr(&self) -> SocketAddr {
140 self.bound_addr
141 }
142
143 #[must_use]
145 pub const fn shutdown_handle(&self) -> &ShutdownHandle {
146 &self.shutdown_handle
147 }
148}
149
150#[derive(Debug)]
160pub struct IdentityHandle {
161 identity: ScpIdentity,
163 document: DidDocument,
165}
166
167impl IdentityHandle {
168 #[must_use]
170 pub const fn identity(&self) -> &ScpIdentity {
171 &self.identity
172 }
173
174 #[must_use]
176 pub fn did(&self) -> &str {
177 &self.identity.did
178 }
179
180 #[must_use]
182 pub const fn document(&self) -> &DidDocument {
183 &self.document
184 }
185}
186
187pub struct ApplicationNode<S: Storage> {
206 domain: Option<String>,
208 relay: RelayHandle,
210 identity: IdentityHandle,
212 storage: Arc<ProtocolStore<S>>,
214 state: Arc<http::NodeState>,
216 tier_reeval: Option<TierReEvalHandle>,
219 tier_change_rx: Option<tokio::sync::mpsc::Receiver<NatTierChange>>,
221 #[cfg(feature = "http3")]
224 http3_config: Option<scp_transport::http3::Http3Config>,
225}
226
227impl<S: Storage + std::fmt::Debug> std::fmt::Debug for ApplicationNode<S> {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 f.debug_struct("ApplicationNode")
230 .field("domain", &self.domain)
231 .field("relay", &self.relay)
232 .field("identity", &self.identity)
233 .field("storage", &"<Storage>")
234 .field(
235 "tier_reeval",
236 &self.tier_reeval.as_ref().map(|_| "<active>"),
237 )
238 .finish_non_exhaustive()
239 }
240}
241
242impl<S: Storage> ApplicationNode<S> {
243 #[must_use]
247 pub fn domain(&self) -> Option<&str> {
248 self.domain.as_deref()
249 }
250
251 #[must_use]
253 pub const fn relay(&self) -> &RelayHandle {
254 &self.relay
255 }
256
257 #[must_use]
259 pub const fn identity(&self) -> &IdentityHandle {
260 &self.identity
261 }
262
263 #[must_use]
265 pub fn storage(&self) -> &ProtocolStore<S> {
266 &self.storage
267 }
268
269 #[must_use]
274 pub fn relay_url(&self) -> &str {
275 &self.state.relay_url
276 }
277
278 #[must_use]
288 pub fn cert_resolver(&self) -> Option<&Arc<tls::CertResolver>> {
289 self.state.cert_resolver.as_ref()
290 }
291
292 pub async fn register_broadcast_context(
309 &self,
310 id: String,
311 name: Option<String>,
312 ) -> Result<(), NodeError> {
313 if id.is_empty() || id.len() > 64 {
315 return Err(NodeError::InvalidConfig(
316 "context id must be 1-64 hex characters".into(),
317 ));
318 }
319 if !id.bytes().all(|b| b.is_ascii_hexdigit()) {
320 return Err(NodeError::InvalidConfig(
321 "context id must contain only hex characters".into(),
322 ));
323 }
324 let id = id.to_ascii_lowercase();
325 let mut contexts = self.state.broadcast_contexts.write().await;
326 if !contexts.contains_key(&id) && contexts.len() >= MAX_BROADCAST_CONTEXTS {
327 return Err(NodeError::InvalidConfig(format!(
328 "broadcast context limit ({MAX_BROADCAST_CONTEXTS}) reached",
329 )));
330 }
331 contexts.insert(id.clone(), BroadcastContext { id, name });
332 drop(contexts);
333 Ok(())
334 }
335
336 #[must_use]
345 pub fn bridge_token_hex(&self) -> String {
346 scp_transport::native::server::hex_encode_32(&self.state.bridge_secret)
347 }
348
349 #[must_use]
356 pub fn dev_token(&self) -> Option<&str> {
357 self.state.dev_token.as_deref()
358 }
359
360 pub fn shutdown(&self) {
370 self.relay.shutdown_handle.shutdown();
371 self.state.shutdown_token.cancel();
372 if let Some(ref handle) = self.tier_reeval {
373 handle.stop();
374 }
375 }
376
377 pub const fn tier_change_rx(
384 &mut self,
385 ) -> Option<&mut tokio::sync::mpsc::Receiver<NatTierChange>> {
386 self.tier_change_rx.as_mut()
387 }
388
389 const MAX_PROJECTED_CONTEXTS: usize = 1024;
391
392 pub async fn enable_broadcast_projection(
426 &self,
427 context_id: &str,
428 broadcast_key: scp_core::crypto::sender_keys::BroadcastKey,
429 admission: scp_core::context::broadcast::BroadcastAdmission,
430 projection_policy: Option<scp_core::context::params::ProjectionPolicy>,
431 ) -> Result<(), NodeError> {
432 projection::validate_projection_policy(admission, projection_policy.as_ref())
434 .map_err(NodeError::InvalidConfig)?;
435
436 let routing_id = projection::compute_routing_id(context_id);
437 let mut registry = self.state.projected_contexts.write().await;
438 if let Some(existing) = registry.get_mut(&routing_id) {
439 existing.insert_key(broadcast_key);
440 existing.admission = admission;
441 existing.projection_policy = projection_policy;
442 } else {
443 if registry.len() >= Self::MAX_PROJECTED_CONTEXTS {
444 return Err(NodeError::InvalidConfig(format!(
445 "projected context limit ({}) reached",
446 Self::MAX_PROJECTED_CONTEXTS
447 )));
448 }
449 let projected =
450 ProjectedContext::new(context_id, broadcast_key, admission, projection_policy);
451 registry.insert(routing_id, projected);
452 }
453 drop(registry);
454 Ok(())
455 }
456
457 pub async fn disable_broadcast_projection(&self, context_id: &str) {
465 let routing_id = projection::compute_routing_id(context_id);
466 let mut registry = self.state.projected_contexts.write().await;
467 registry.remove(&routing_id);
468 }
469
470 pub async fn propagate_ban_keys(
488 &self,
489 context_id: &str,
490 ban_result: &scp_core::context::broadcast::GovernanceBanResult,
491 ) {
492 use scp_core::context::governance::RevocationScope;
493
494 let routing_id = projection::compute_routing_id(context_id);
495 let mut registry = self.state.projected_contexts.write().await;
496 if let Some(projected) = registry.get_mut(&routing_id) {
497 for rotation in &ban_result.rotated_authors {
499 projected.insert_key(rotation.new_key.clone());
500 }
501
502 if ban_result.scope == RevocationScope::Full {
507 let new_epochs: std::collections::HashSet<u64> = ban_result
508 .rotated_authors
509 .iter()
510 .map(|r| r.new_epoch)
511 .collect();
512 projected.retain_only_epochs(&new_epochs);
513 }
514 }
515 }
516}
517
518#[must_use]
522pub fn builder() -> ApplicationNodeBuilder {
523 ApplicationNodeBuilder::new()
524}
525
526const IDENTITY_STORAGE_KEY: &str = "scp/identity";
538
539#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
561struct PersistedIdentity {
562 identity: ScpIdentity,
563 document: DidDocument,
564}
565
566enum IdentitySource<K: KeyCustody, D: DidMethod> {
572 Generate {
574 key_custody: Arc<K>,
575 did_method: Arc<D>,
576 },
577 Explicit(Box<ExplicitIdentity<D>>),
580}
581
582struct ExplicitIdentity<D: DidMethod> {
584 identity: ScpIdentity,
585 document: DidDocument,
586 did_method: Arc<D>,
587}
588
589pub struct NoDomain;
595pub struct HasDomain;
597pub struct HasNoDomain;
599
600pub struct NoIdentity;
602pub struct HasIdentity;
604
605#[derive(Debug, Clone, PartialEq, Eq)]
615pub enum ReachabilityTier {
616 Upnp {
619 external_addr: SocketAddr,
621 },
622 Stun {
625 external_addr: SocketAddr,
627 },
628 Bridge {
631 bridge_url: String,
633 },
634}
635
636pub trait NatStrategy: Send + Sync {
642 fn select_tier(
650 &self,
651 relay_port: u16,
652 ) -> std::pin::Pin<
653 Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
654 >;
655}
656
657const DEFAULT_STUN_ENDPOINTS: &[(&str, &str)] = &[
664 ("74.125.250.129:19302", "stun1.l.google.com"),
665 ("64.233.163.127:19302", "stun2.l.google.com"),
666];
667
668pub struct DefaultNatStrategy {
686 stun_server: Option<String>,
688 bridge_relay: Option<String>,
690 port_mapper: Option<Arc<dyn scp_transport::nat::PortMapper>>,
692 reachability_probe: Option<Arc<dyn scp_transport::nat::ReachabilityProbe>>,
696}
697
698impl DefaultNatStrategy {
699 #[must_use]
701 pub fn new(stun_server: Option<String>, bridge_relay: Option<String>) -> Self {
702 Self {
703 stun_server,
704 bridge_relay,
705 port_mapper: None,
706 reachability_probe: None,
707 }
708 }
709
710 #[must_use]
712 pub fn with_port_mapper(mut self, mapper: Arc<dyn scp_transport::nat::PortMapper>) -> Self {
713 self.port_mapper = Some(mapper);
714 self
715 }
716
717 #[must_use]
722 pub fn with_reachability_probe(
723 mut self,
724 probe: Arc<dyn scp_transport::nat::ReachabilityProbe>,
725 ) -> Self {
726 self.reachability_probe = Some(probe);
727 self
728 }
729
730 fn build_stun_endpoints(&self) -> Result<Vec<scp_transport::nat::StunEndpoint>, NodeError> {
732 use scp_transport::nat::StunEndpoint;
733 if let Some(ref override_url) = self.stun_server {
734 let addr: SocketAddr = override_url.parse().map_err(|e| {
735 NodeError::Nat(format!("invalid STUN server address '{override_url}': {e}"))
736 })?;
737 Ok(vec![StunEndpoint {
738 addr,
739 label: override_url.clone(),
740 }])
741 } else {
742 Ok(DEFAULT_STUN_ENDPOINTS
743 .iter()
744 .map(|(addr_str, label)| {
745 #[allow(clippy::expect_used)]
748 let addr: SocketAddr = addr_str
749 .parse()
750 .expect("DEFAULT_STUN_ENDPOINTS contains valid SocketAddr literals");
751 StunEndpoint {
752 addr,
753 label: (*label).to_owned(),
754 }
755 })
756 .collect())
757 }
758 }
759
760 async fn try_tier1_upnp(
765 &self,
766 relay_port: u16,
767 socket: &tokio::net::UdpSocket,
768 probe: &dyn scp_transport::nat::ReachabilityProbe,
769 ) -> Option<ReachabilityTier> {
770 let mapper = self.port_mapper.as_ref()?;
771 tracing::info!("attempting Tier 1 UPnP/NAT-PMP port mapping");
772 match mapper.map_port(relay_port).await {
773 Ok(mapping) => {
774 tracing::info!(
775 protocol = %mapping.protocol,
776 external_addr = %mapping.external_addr,
777 "UPnP port mapping acquired, running reachability self-test"
778 );
779 let reachable = probe
780 .probe_reachability(socket, mapping.external_addr)
781 .await
782 .unwrap_or(false);
783
784 if reachable {
785 tracing::info!(
786 external_addr = %mapping.external_addr,
787 "Tier 1 reachability self-test passed"
788 );
789 return Some(ReachabilityTier::Upnp {
790 external_addr: mapping.external_addr,
791 });
792 }
793 tracing::warn!("Tier 1 reachability self-test failed, falling through to Tier 2");
794 None
795 }
796 Err(e) => {
797 tracing::warn!(
798 error = %e,
799 "UPnP port mapping failed, falling through to Tier 2"
800 );
801 None
802 }
803 }
804 }
805}
806
807impl NatStrategy for DefaultNatStrategy {
808 fn select_tier(
809 &self,
810 relay_port: u16,
811 ) -> std::pin::Pin<
812 Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
813 > {
814 Box::pin(async move {
815 use scp_transport::nat::{DefaultReachabilityProbe, NatProber, ReachabilityProbe};
816
817 let endpoints = self.build_stun_endpoints()?;
819
820 let probe: Arc<dyn ReachabilityProbe> = if let Some(ref p) = self.reachability_probe {
824 Arc::clone(p)
825 } else {
826 Arc::new(DefaultReachabilityProbe::new(endpoints[0].addr, None))
827 };
828
829 let socket = tokio::net::UdpSocket::bind("0.0.0.0:0")
832 .await
833 .map_err(|e| {
834 NodeError::Nat(format!("failed to bind UDP socket for NAT probing: {e}"))
835 })?;
836
837 let prober = NatProber::new(endpoints, None)
838 .map_err(|e| NodeError::Nat(format!("failed to create NAT prober: {e}")))?;
839
840 let probe_result = prober
842 .probe_with_socket(&socket)
843 .await
844 .map_err(|e| NodeError::Nat(format!("NAT probing failed: {e}")))?;
845
846 tracing::info!(
847 nat_type = %probe_result.nat_type,
848 external_addr = ?probe_result.external_addr,
849 "NAT type probed"
850 );
851
852 if let Some(tier) = self.try_tier1_upnp(relay_port, &socket, &*probe).await {
854 return Ok(tier);
855 }
856
857 if probe_result.nat_type.is_hole_punchable()
860 && let Some(external_addr) = probe_result.external_addr
861 {
862 tracing::info!(
863 external_addr = %external_addr,
864 "attempting Tier 2 STUN, running reachability self-test"
865 );
866 let reachable = probe
867 .probe_reachability(&socket, external_addr)
868 .await
869 .unwrap_or(false);
870
871 if reachable {
872 tracing::info!(
873 external_addr = %external_addr,
874 "Tier 2 reachability self-test passed"
875 );
876 return Ok(ReachabilityTier::Stun { external_addr });
877 }
878
879 tracing::warn!("Tier 2 reachability self-test failed, falling through to Tier 3");
880 }
881
882 if let Some(ref bridge_url) = self.bridge_relay {
884 return Ok(ReachabilityTier::Bridge {
885 bridge_url: bridge_url.clone(),
886 });
887 }
888
889 Err(NodeError::Nat(
890 "all reachability tiers failed: NAT is symmetric and no bridge relay configured"
891 .into(),
892 ))
893 })
894 }
895}
896
897pub trait TlsProvider: Send + Sync {
907 fn provision(
912 &self,
913 ) -> std::pin::Pin<
914 Box<
915 dyn std::future::Future<Output = Result<tls::CertificateData, tls::TlsError>>
916 + Send
917 + '_,
918 >,
919 >;
920
921 fn challenges(&self) -> Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>> {
935 Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()))
936 }
937
938 fn needs_challenge_listener(&self) -> bool {
945 false
946 }
947}
948
949impl<S: Storage + 'static> TlsProvider for tls::AcmeProvider<S> {
951 fn provision(
952 &self,
953 ) -> std::pin::Pin<
954 Box<
955 dyn std::future::Future<Output = Result<tls::CertificateData, tls::TlsError>>
956 + Send
957 + '_,
958 >,
959 > {
960 Box::pin(self.load_or_provision())
961 }
962
963 fn challenges(&self) -> Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>> {
964 self.challenges()
965 }
966
967 fn needs_challenge_listener(&self) -> bool {
968 true
969 }
970}
971
972pub(crate) trait DidPublisher: Send + Sync {
984 fn publish<'a>(
986 &'a self,
987 identity: &'a ScpIdentity,
988 document: &'a DidDocument,
989 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), IdentityError>> + Send + 'a>>;
990}
991
992struct DidMethodPublisher<D: DidMethod> {
994 inner: Arc<D>,
995}
996
997impl<D: DidMethod + 'static> DidPublisher for DidMethodPublisher<D> {
998 fn publish<'a>(
999 &'a self,
1000 identity: &'a ScpIdentity,
1001 document: &'a DidDocument,
1002 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), IdentityError>> + Send + 'a>>
1003 {
1004 Box::pin(self.inner.publish(identity, document))
1005 }
1006}
1007
1008const TIER_REEVALUATION_INTERVAL: Duration = Duration::from_secs(30 * 60);
1014
1015struct TierReEvalHandle {
1021 task: tokio::task::JoinHandle<()>,
1024 cancel_tx: tokio::sync::watch::Sender<bool>,
1026}
1027
1028impl TierReEvalHandle {
1029 fn stop(&self) {
1031 let _ = self.cancel_tx.send(true);
1032 }
1033}
1034
1035impl Drop for TierReEvalHandle {
1036 fn drop(&mut self) {
1037 if self.cancel_tx.send(true).is_err() {
1041 self.task.abort();
1042 }
1043 }
1044}
1045
1046fn tier_to_relay_url(tier: &ReachabilityTier) -> String {
1048 match tier {
1049 ReachabilityTier::Upnp { external_addr } | ReachabilityTier::Stun { external_addr } => {
1050 format!("ws://{external_addr}/scp/v1")
1051 }
1052 ReachabilityTier::Bridge { bridge_url } => bridge_url.clone(),
1053 }
1054}
1055
1056async fn apply_tier_change(
1060 current_url: &str,
1061 new_relay_url: &str,
1062 trigger_reason: &str,
1063 current_doc: &DidDocument,
1064 publisher: &dyn DidPublisher,
1065 identity: &ScpIdentity,
1066 event_tx: Option<&tokio::sync::mpsc::Sender<NatTierChange>>,
1067) -> Option<(String, DidDocument)> {
1068 let mut updated_doc = current_doc.clone();
1069 for svc in &mut updated_doc.service {
1070 if svc.service_type == "SCPRelay" && svc.service_endpoint == current_url {
1071 new_relay_url.clone_into(&mut svc.service_endpoint);
1072 }
1073 }
1074 match publisher.publish(identity, &updated_doc).await {
1075 Ok(()) => {
1076 if let Some(tx) = event_tx {
1080 let _ = tx
1081 .send(NatTierChange::TierChanged {
1082 previous_relay_url: current_url.to_owned(),
1083 new_relay_url: new_relay_url.to_owned(),
1084 reason: trigger_reason.to_owned(),
1085 })
1086 .await;
1087 }
1088 tracing::info!(new_url = %new_relay_url, did = %identity.did,
1089 "DID document republished with new relay URL");
1090 Some((new_relay_url.to_owned(), updated_doc))
1091 }
1092 Err(e) => {
1093 tracing::warn!(error = %e, "DID document republish failed after tier change");
1094 None
1095 }
1096 }
1097}
1098
1099#[allow(clippy::too_many_arguments)]
1109fn spawn_tier_reevaluation(
1110 nat_strategy: Arc<dyn NatStrategy>,
1111 network_detector: Option<Arc<dyn NetworkChangeDetector>>,
1112 publisher: Arc<dyn DidPublisher>,
1113 identity: ScpIdentity,
1114 document: DidDocument,
1115 relay_port: u16,
1116 current_relay_url: String,
1117 event_tx: Option<tokio::sync::mpsc::Sender<NatTierChange>>,
1118 reevaluation_interval: Duration,
1119) -> TierReEvalHandle {
1120 let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
1121 let task = tokio::spawn(async move {
1122 let mut current_url = current_relay_url;
1123 let mut current_doc = document;
1124 loop {
1125 let trigger_reason = tokio::select! {
1126 () = tokio::time::sleep(reevaluation_interval) => {
1127 "periodic 30-minute re-evaluation (§10.12.1)"
1128 }
1129 result = async {
1130 match network_detector.as_ref() {
1131 Some(d) => d.wait_for_change().await,
1132 None => std::future::pending().await,
1133 }
1134 } => {
1135 match result {
1136 Ok(()) => "network change event detected",
1137 Err(e) => {
1138 tracing::warn!(error = %e, "network change detector error");
1139 continue;
1140 }
1141 }
1142 }
1143 result = cancel_rx.changed() => {
1144 if result.is_err() || *cancel_rx.borrow() { return; }
1147 continue;
1148 }
1149 };
1150 tracing::debug!(reason = trigger_reason, "tier re-evaluation triggered");
1151 let new_tier = match nat_strategy.select_tier(relay_port).await {
1152 Ok(tier) => tier,
1153 Err(e) => {
1154 tracing::warn!(error = %e, "tier re-evaluation failed, keeping current tier");
1155 continue;
1156 }
1157 };
1158 let new_relay_url = tier_to_relay_url(&new_tier);
1159 if new_relay_url == current_url {
1160 tracing::debug!(relay_url = %current_url, "tier re-evaluation: no change");
1161 continue;
1162 }
1163 tracing::info!(
1164 previous_url = %current_url, new_url = %new_relay_url,
1165 tier = ?new_tier, reason = trigger_reason,
1166 "reachability tier changed, updating DID document (§10.12.1)"
1167 );
1168 if let Some((url, doc)) = apply_tier_change(
1169 ¤t_url,
1170 &new_relay_url,
1171 trigger_reason,
1172 ¤t_doc,
1173 &*publisher,
1174 &identity,
1175 event_tx.as_ref(),
1176 )
1177 .await
1178 {
1179 current_url = url;
1180 current_doc = doc;
1181 }
1182 }
1183 });
1184 TierReEvalHandle { task, cancel_tx }
1185}
1186
1187pub struct ApplicationNodeBuilder<
1214 K: KeyCustody = NoOpCustody,
1215 D: DidMethod = NoOpDidMethod,
1216 S: Storage = NoOpStorage,
1217 Dom = NoDomain,
1218 Id = NoIdentity,
1219> {
1220 domain: Option<String>,
1221 identity_source: Option<IdentitySource<K, D>>,
1222 storage: Option<S>,
1223 blob_storage: Option<BlobStorageBackend>,
1224 bind_addr: Option<SocketAddr>,
1225 acme_email: Option<String>,
1226 stun_server: Option<String>,
1228 bridge_relay: Option<String>,
1230 nat_strategy: Option<Arc<dyn NatStrategy>>,
1232 port_mapper: Option<Arc<dyn scp_transport::nat::PortMapper>>,
1234 reachability_probe: Option<Arc<dyn scp_transport::nat::ReachabilityProbe>>,
1236 tls_provider: Option<Arc<dyn TlsProvider>>,
1238 network_detector: Option<Arc<dyn NetworkChangeDetector>>,
1241 local_api_addr: Option<SocketAddr>,
1243 http_bind_addr: Option<SocketAddr>,
1247 cors_origins: Option<Vec<String>>,
1250 projection_rate_limit: Option<u32>,
1254 #[cfg(feature = "http3")]
1256 http3_config: Option<scp_transport::http3::Http3Config>,
1257 persist_identity: bool,
1261 _domain_state: PhantomData<Dom>,
1262 _identity_state: PhantomData<Id>,
1263}
1264
1265impl ApplicationNodeBuilder {
1266 #[must_use]
1271 pub fn new() -> Self {
1272 Self {
1273 domain: None,
1274 identity_source: None,
1275 storage: None,
1276 blob_storage: Some(BlobStorageBackend::default()),
1277 bind_addr: None,
1278 acme_email: None,
1279 stun_server: None,
1280 bridge_relay: None,
1281 nat_strategy: None,
1282 port_mapper: None,
1283 reachability_probe: None,
1284 tls_provider: None,
1285 network_detector: None,
1286 local_api_addr: None,
1287 http_bind_addr: None,
1288 cors_origins: None,
1289 projection_rate_limit: None,
1290 #[cfg(feature = "http3")]
1291 http3_config: None,
1292 persist_identity: false,
1293 _domain_state: PhantomData,
1294 _identity_state: PhantomData,
1295 }
1296 }
1297}
1298
1299impl Default
1300 for ApplicationNodeBuilder<NoOpCustody, NoOpDidMethod, NoOpStorage, NoDomain, NoIdentity>
1301{
1302 fn default() -> Self {
1303 Self::new()
1304 }
1305}
1306
1307impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static, Id>
1308 ApplicationNodeBuilder<K, D, S, NoDomain, Id>
1309{
1310 #[must_use]
1316 pub fn domain(self, domain: &str) -> ApplicationNodeBuilder<K, D, S, HasDomain, Id> {
1317 ApplicationNodeBuilder {
1318 domain: Some(domain.to_owned()),
1319 identity_source: self.identity_source,
1320 storage: self.storage,
1321 blob_storage: self.blob_storage,
1322 bind_addr: self.bind_addr,
1323 acme_email: self.acme_email,
1324 stun_server: self.stun_server,
1325 bridge_relay: self.bridge_relay,
1326 nat_strategy: self.nat_strategy,
1327 port_mapper: self.port_mapper,
1328 reachability_probe: self.reachability_probe,
1329 tls_provider: self.tls_provider,
1330 network_detector: self.network_detector,
1331 local_api_addr: self.local_api_addr,
1332 http_bind_addr: self.http_bind_addr,
1333 cors_origins: self.cors_origins,
1334 projection_rate_limit: self.projection_rate_limit,
1335 #[cfg(feature = "http3")]
1336 http3_config: self.http3_config,
1337 persist_identity: self.persist_identity,
1338 _domain_state: PhantomData,
1339 _identity_state: PhantomData,
1340 }
1341 }
1342
1343 #[must_use]
1353 pub fn no_domain(self) -> ApplicationNodeBuilder<K, D, S, HasNoDomain, Id> {
1354 ApplicationNodeBuilder {
1355 domain: None,
1356 identity_source: self.identity_source,
1357 storage: self.storage,
1358 blob_storage: self.blob_storage,
1359 bind_addr: self.bind_addr,
1360 acme_email: self.acme_email,
1361 stun_server: self.stun_server,
1362 bridge_relay: self.bridge_relay,
1363 nat_strategy: self.nat_strategy,
1364 port_mapper: self.port_mapper,
1365 reachability_probe: self.reachability_probe,
1366 tls_provider: self.tls_provider,
1367 network_detector: self.network_detector,
1368 local_api_addr: self.local_api_addr,
1369 http_bind_addr: self.http_bind_addr,
1370 cors_origins: self.cors_origins,
1371 projection_rate_limit: self.projection_rate_limit,
1372 #[cfg(feature = "http3")]
1373 http3_config: self.http3_config,
1374 persist_identity: self.persist_identity,
1375 _domain_state: PhantomData,
1376 _identity_state: PhantomData,
1377 }
1378 }
1379}
1380
1381impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static, Dom, Id>
1382 ApplicationNodeBuilder<K, D, S, Dom, Id>
1383{
1384 #[must_use]
1388 pub const fn bind_addr(mut self, addr: SocketAddr) -> Self {
1389 self.bind_addr = Some(addr);
1390 self
1391 }
1392
1393 #[must_use]
1401 pub fn acme_email(mut self, email: &str) -> Self {
1402 self.acme_email = Some(email.to_owned());
1403 self
1404 }
1405
1406 #[must_use]
1411 pub fn stun_server(mut self, url: &str) -> Self {
1412 self.stun_server = Some(url.to_owned());
1413 self
1414 }
1415
1416 #[must_use]
1421 pub fn bridge_relay(mut self, url: &str) -> Self {
1422 self.bridge_relay = Some(url.to_owned());
1423 self
1424 }
1425
1426 #[must_use]
1431 pub fn nat_strategy(mut self, strategy: Arc<dyn NatStrategy>) -> Self {
1432 self.nat_strategy = Some(strategy);
1433 self
1434 }
1435
1436 #[must_use]
1442 pub fn port_mapper(mut self, mapper: Arc<dyn scp_transport::nat::PortMapper>) -> Self {
1443 self.port_mapper = Some(mapper);
1444 self
1445 }
1446
1447 #[must_use]
1456 pub fn reachability_probe(
1457 mut self,
1458 probe: Arc<dyn scp_transport::nat::ReachabilityProbe>,
1459 ) -> Self {
1460 self.reachability_probe = Some(probe);
1461 self
1462 }
1463
1464 #[must_use]
1470 pub fn tls_provider(mut self, provider: Arc<dyn TlsProvider>) -> Self {
1471 self.tls_provider = Some(provider);
1472 self
1473 }
1474
1475 #[must_use]
1486 pub fn network_detector(mut self, detector: Arc<dyn NetworkChangeDetector>) -> Self {
1487 self.network_detector = Some(detector);
1488 self
1489 }
1490
1491 #[must_use]
1505 pub fn local_api(mut self, addr: SocketAddr) -> Self {
1506 assert!(
1507 addr.ip().is_loopback(),
1508 "dev API bind address must be loopback (127.0.0.1 or ::1), got {addr}"
1509 );
1510 self.local_api_addr = Some(addr);
1511 self
1512 }
1513
1514 #[must_use]
1527 pub const fn http_bind_addr(mut self, addr: SocketAddr) -> Self {
1528 self.http_bind_addr = Some(addr);
1529 self
1530 }
1531
1532 #[must_use]
1551 pub fn cors_origins(mut self, origins: Vec<String>) -> Self {
1552 self.cors_origins = if origins.is_empty() {
1553 None
1554 } else {
1555 Some(origins)
1556 };
1557 self
1558 }
1559
1560 #[must_use]
1570 pub const fn projection_rate_limit(mut self, rate: u32) -> Self {
1571 self.projection_rate_limit = Some(rate);
1572 self
1573 }
1574
1575 #[cfg(feature = "http3")]
1584 #[must_use]
1585 pub fn http3(mut self, config: scp_transport::http3::Http3Config) -> Self {
1586 self.http3_config = Some(config);
1587 self
1588 }
1589}
1590
1591impl<K: KeyCustody + 'static, D: DidMethod + 'static, Dom, Id>
1592 ApplicationNodeBuilder<K, D, NoOpStorage, Dom, Id>
1593{
1594 pub fn storage<S2: Storage + 'static>(
1598 self,
1599 storage: S2,
1600 ) -> ApplicationNodeBuilder<K, D, S2, Dom, Id> {
1601 ApplicationNodeBuilder {
1602 domain: self.domain,
1603 identity_source: self.identity_source,
1604 storage: Some(storage),
1605 blob_storage: self.blob_storage,
1606 bind_addr: self.bind_addr,
1607 acme_email: self.acme_email,
1608 stun_server: self.stun_server,
1609 bridge_relay: self.bridge_relay,
1610 nat_strategy: self.nat_strategy,
1611 port_mapper: self.port_mapper,
1612 reachability_probe: self.reachability_probe,
1613 tls_provider: self.tls_provider,
1614 network_detector: self.network_detector,
1615 local_api_addr: self.local_api_addr,
1616 http_bind_addr: self.http_bind_addr,
1617 cors_origins: self.cors_origins,
1618 projection_rate_limit: self.projection_rate_limit,
1619 #[cfg(feature = "http3")]
1620 http3_config: self.http3_config,
1621 persist_identity: self.persist_identity,
1622 _domain_state: PhantomData,
1623 _identity_state: PhantomData,
1624 }
1625 }
1626}
1627
1628impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static, Dom, Id>
1629 ApplicationNodeBuilder<K, D, S, Dom, Id>
1630{
1631 #[must_use]
1636 pub fn blob_storage(mut self, blob_storage: impl Into<BlobStorageBackend>) -> Self {
1637 self.blob_storage = Some(blob_storage.into());
1638 self
1639 }
1640}
1641
1642impl<S: Storage + 'static, Dom>
1643 ApplicationNodeBuilder<NoOpCustody, NoOpDidMethod, S, Dom, NoIdentity>
1644{
1645 pub fn identity<D2: DidMethod + 'static>(
1650 self,
1651 identity: ScpIdentity,
1652 document: DidDocument,
1653 did_method: Arc<D2>,
1654 ) -> ApplicationNodeBuilder<NoOpCustody, D2, S, Dom, HasIdentity> {
1655 ApplicationNodeBuilder {
1656 domain: self.domain,
1657 identity_source: Some(IdentitySource::Explicit(Box::new(ExplicitIdentity {
1658 identity,
1659 document,
1660 did_method,
1661 }))),
1662 storage: self.storage,
1663 blob_storage: self.blob_storage,
1664 bind_addr: self.bind_addr,
1665 acme_email: self.acme_email,
1666 stun_server: self.stun_server,
1667 bridge_relay: self.bridge_relay,
1668 nat_strategy: self.nat_strategy,
1669 port_mapper: self.port_mapper,
1670 reachability_probe: self.reachability_probe,
1671 tls_provider: self.tls_provider,
1672 network_detector: self.network_detector,
1673 local_api_addr: self.local_api_addr,
1674 http_bind_addr: self.http_bind_addr,
1675 cors_origins: self.cors_origins,
1676 projection_rate_limit: self.projection_rate_limit,
1677 #[cfg(feature = "http3")]
1678 http3_config: self.http3_config,
1679 persist_identity: self.persist_identity,
1680 _domain_state: PhantomData,
1681 _identity_state: PhantomData,
1682 }
1683 }
1684
1685 pub fn generate_identity_with<K2: KeyCustody + 'static, D2: DidMethod + 'static>(
1689 self,
1690 key_custody: Arc<K2>,
1691 did_method: Arc<D2>,
1692 ) -> ApplicationNodeBuilder<K2, D2, S, Dom, HasIdentity> {
1693 ApplicationNodeBuilder {
1694 domain: self.domain,
1695 identity_source: Some(IdentitySource::Generate {
1696 key_custody,
1697 did_method,
1698 }),
1699 storage: self.storage,
1700 blob_storage: self.blob_storage,
1701 bind_addr: self.bind_addr,
1702 acme_email: self.acme_email,
1703 stun_server: self.stun_server,
1704 bridge_relay: self.bridge_relay,
1705 nat_strategy: self.nat_strategy,
1706 port_mapper: self.port_mapper,
1707 reachability_probe: self.reachability_probe,
1708 tls_provider: self.tls_provider,
1709 network_detector: self.network_detector,
1710 local_api_addr: self.local_api_addr,
1711 http_bind_addr: self.http_bind_addr,
1712 cors_origins: self.cors_origins,
1713 projection_rate_limit: self.projection_rate_limit,
1714 #[cfg(feature = "http3")]
1715 http3_config: self.http3_config,
1716 persist_identity: self.persist_identity,
1717 _domain_state: PhantomData,
1718 _identity_state: PhantomData,
1719 }
1720 }
1721
1722 pub fn identity_with_storage<K2: KeyCustody + 'static, D2: DidMethod + 'static>(
1774 self,
1775 key_custody: Arc<K2>,
1776 did_method: Arc<D2>,
1777 ) -> ApplicationNodeBuilder<K2, D2, S, Dom, HasIdentity> {
1778 ApplicationNodeBuilder {
1779 domain: self.domain,
1780 identity_source: Some(IdentitySource::Generate {
1781 key_custody,
1782 did_method,
1783 }),
1784 storage: self.storage,
1785 blob_storage: self.blob_storage,
1786 bind_addr: self.bind_addr,
1787 acme_email: self.acme_email,
1788 stun_server: self.stun_server,
1789 bridge_relay: self.bridge_relay,
1790 nat_strategy: self.nat_strategy,
1791 port_mapper: self.port_mapper,
1792 reachability_probe: self.reachability_probe,
1793 tls_provider: self.tls_provider,
1794 network_detector: self.network_detector,
1795 local_api_addr: self.local_api_addr,
1796 http_bind_addr: self.http_bind_addr,
1797 cors_origins: self.cors_origins,
1798 projection_rate_limit: self.projection_rate_limit,
1799 #[cfg(feature = "http3")]
1800 http3_config: self.http3_config,
1801 persist_identity: true,
1802 _domain_state: PhantomData,
1803 _identity_state: PhantomData,
1804 }
1805 }
1806}
1807
1808impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: EncryptedStorage + 'static>
1809 ApplicationNodeBuilder<K, D, S, HasDomain, HasIdentity>
1810{
1811 pub async fn build(mut self) -> Result<ApplicationNode<S>, NodeError> {
1823 let storage = self
1824 .storage
1825 .take()
1826 .ok_or(NodeError::MissingField("storage"))?;
1827 let protocol_store = Arc::new(ProtocolStore::new(storage));
1828 self.build_with_store(protocol_store).await
1829 }
1830}
1831
1832#[cfg(any(test, feature = "allow_unencrypted_storage"))]
1834impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static>
1835 ApplicationNodeBuilder<K, D, S, HasDomain, HasIdentity>
1836{
1837 pub async fn build_for_testing(mut self) -> Result<ApplicationNode<S>, NodeError> {
1845 let storage = self
1846 .storage
1847 .take()
1848 .ok_or(NodeError::MissingField("storage"))?;
1849 let protocol_store = Arc::new(ProtocolStore::new_for_testing(storage));
1850 self.build_with_store(protocol_store).await
1851 }
1852}
1853
1854impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static>
1855 ApplicationNodeBuilder<K, D, S, HasDomain, HasIdentity>
1856{
1857 #[allow(clippy::too_many_lines)] async fn build_with_store(
1860 self,
1861 protocol_store: Arc<ProtocolStore<S>>,
1862 ) -> Result<ApplicationNode<S>, NodeError> {
1863 let domain = self.domain.ok_or(NodeError::MissingField("domain"))?;
1864 let identity_source = self
1865 .identity_source
1866 .ok_or(NodeError::MissingField("identity"))?;
1867 let persist = self.persist_identity;
1868
1869 let (identity, document, did_method) =
1870 resolve_identity_persistent(identity_source, persist, protocol_store.storage()).await?;
1871 let bridge_secret = generate_bridge_secret();
1872 let bind_addr = self
1873 .bind_addr
1874 .unwrap_or_else(|| SocketAddr::from(([127, 0, 0, 1], 0)));
1875 let relay_config = RelayConfig {
1876 bind_addr,
1877 bridge_secret: Some(*bridge_secret),
1878 ..RelayConfig::default()
1879 };
1880
1881 let blob_storage = Arc::new(
1882 self.blob_storage
1883 .ok_or(NodeError::MissingField("blob_storage"))?,
1884 );
1885 let relay_server = RelayServer::new(relay_config.clone(), Arc::clone(&blob_storage));
1886 let connection_tracker = relay_server.connection_tracker();
1887 let subscription_registry = relay_server.subscriptions();
1888 let (shutdown_handle, bound_addr) = relay_server.start().await?;
1889 let dev_token = self.local_api_addr.map(generate_dev_token);
1890 let http_bind_addr = self.http_bind_addr.unwrap_or(DEFAULT_HTTP_BIND_ADDR);
1891
1892 let tls_provider = resolve_tls(
1893 self.tls_provider,
1894 &domain,
1895 &protocol_store,
1896 self.acme_email.as_ref(),
1897 );
1898
1899 let (provision_result, acme_challenges) =
1900 provision_with_challenge_listener(&*tls_provider).await?;
1901 let rate_limit = self
1902 .projection_rate_limit
1903 .unwrap_or(DEFAULT_PROJECTION_RATE_LIMIT);
1904
1905 match provision_result {
1906 Ok(cert_data) => {
1907 build_domain_inner(
1908 domain,
1909 identity,
1910 document,
1911 did_method,
1912 protocol_store,
1913 shutdown_handle,
1914 bound_addr,
1915 bridge_secret,
1916 dev_token,
1917 self.local_api_addr,
1918 blob_storage,
1919 relay_config,
1920 http_bind_addr,
1921 self.cors_origins.clone(),
1922 rate_limit,
1923 cert_data,
1924 connection_tracker.clone(),
1925 subscription_registry.clone(),
1926 acme_challenges,
1927 #[cfg(feature = "http3")]
1928 self.http3_config,
1929 )
1930 .await
1931 }
1932 Err(tls_err) => {
1933 tracing::warn!(
1934 domain = %domain, error = %tls_err,
1935 "TLS provisioning failed, falling through to NAT-traversed mode (§10.12.8)"
1936 );
1937 let strategy = resolve_nat(
1938 self.nat_strategy,
1939 self.stun_server,
1940 self.bridge_relay,
1941 self.port_mapper,
1942 self.reachability_probe,
1943 );
1944 build_no_domain_inner(
1945 identity,
1946 document,
1947 did_method,
1948 protocol_store,
1949 shutdown_handle,
1950 bound_addr,
1951 strategy,
1952 bridge_secret,
1953 dev_token,
1954 self.local_api_addr,
1955 blob_storage,
1956 relay_config,
1957 Some(http_bind_addr),
1958 self.cors_origins,
1959 rate_limit,
1960 self.network_detector,
1961 connection_tracker,
1962 subscription_registry,
1963 )
1964 .await
1965 }
1966 }
1967 }
1968}
1969
1970async fn resolve_identity<K: KeyCustody, D: DidMethod>(
1977 source: IdentitySource<K, D>,
1978) -> Result<(ScpIdentity, DidDocument, Arc<D>), NodeError> {
1979 match source {
1980 IdentitySource::Generate {
1981 key_custody,
1982 did_method,
1983 } => {
1984 let (identity, document) = did_method.create(&*key_custody).await?;
1985 Ok((identity, document, did_method))
1986 }
1987 IdentitySource::Explicit(e) => Ok((e.identity, e.document, e.did_method)),
1988 }
1989}
1990
1991async fn validate_persisted_custody<K: KeyCustody>(
1999 persisted: &PersistedIdentity,
2000 key_custody: &K,
2001) -> Result<(), NodeError> {
2002 let identity_pub = key_custody
2004 .public_key(&persisted.identity.identity_key)
2005 .await
2006 .map_err(|e| {
2007 NodeError::Storage(format!(
2008 "persisted identity key handle not found in custody: {e}"
2009 ))
2010 })?;
2011 verify_vm_match(&persisted.document, "#0", &identity_pub, "identity key")?;
2012
2013 let active_pub = key_custody
2015 .public_key(&persisted.identity.active_signing_key)
2016 .await
2017 .map_err(|e| {
2018 NodeError::Storage(format!(
2019 "persisted active signing key handle not found in custody: {e}"
2020 ))
2021 })?;
2022 verify_vm_match(
2023 &persisted.document,
2024 "#active",
2025 &active_pub,
2026 "active signing key",
2027 )?;
2028
2029 if let Some(ref agent_key) = persisted.identity.agent_signing_key {
2031 let agent_pub = key_custody.public_key(agent_key).await.map_err(|e| {
2032 NodeError::Storage(format!(
2033 "persisted agent signing key handle not found in custody: {e}"
2034 ))
2035 })?;
2036 verify_vm_match(
2037 &persisted.document,
2038 "#agent",
2039 &agent_pub,
2040 "agent signing key",
2041 )?;
2042 }
2043
2044 Ok(())
2045}
2046
2047fn verify_vm_match(
2054 document: &DidDocument,
2055 vm_suffix: &str,
2056 public_key: &scp_platform::traits::PublicKey,
2057 label: &str,
2058) -> Result<(), NodeError> {
2059 if let Some(vm) = document
2060 .verification_method
2061 .iter()
2062 .find(|vm| vm.id.ends_with(vm_suffix))
2063 {
2064 let expected_multibase = format!("z{}", bs58::encode(public_key.as_bytes()).into_string());
2065 if vm.public_key_multibase != expected_multibase {
2066 return Err(NodeError::Storage(format!(
2067 "custody {label} does not match DID document {vm_suffix} verification method \
2068 (custody: {expected_multibase}, document: {})",
2069 vm.public_key_multibase
2070 )));
2071 }
2072 }
2073 Ok(())
2074}
2075
2076async fn resolve_identity_persistent<K: KeyCustody, D: DidMethod, S: Storage>(
2087 source: IdentitySource<K, D>,
2088 persist: bool,
2089 storage: &S,
2090) -> Result<(ScpIdentity, DidDocument, Arc<D>), NodeError> {
2091 if !persist {
2092 return resolve_identity(source).await;
2093 }
2094
2095 match source {
2096 IdentitySource::Generate {
2097 key_custody,
2098 did_method,
2099 } => {
2100 let existing = storage.retrieve(IDENTITY_STORAGE_KEY).await.map_err(|e| {
2102 NodeError::Storage(format!("failed to read persisted identity: {e}"))
2103 })?;
2104
2105 if let Some(bytes) = existing {
2106 let envelope: StoredValue<PersistedIdentity> = rmp_serde::from_slice(&bytes)
2108 .map_err(|e| {
2109 NodeError::Storage(format!("failed to deserialize persisted identity: {e}"))
2110 })?;
2111
2112 if envelope.version > CURRENT_STORE_VERSION {
2115 return Err(NodeError::Storage(format!(
2116 "persisted identity version {} is newer than supported version {}; \
2117 upgrade the binary or delete the stored identity",
2118 envelope.version, CURRENT_STORE_VERSION
2119 )));
2120 }
2121
2122 let persisted = envelope.data;
2123
2124 validate_persisted_custody(&persisted, &*key_custody).await?;
2126
2127 tracing::info!(
2128 did = %persisted.identity.did,
2129 "reloaded persisted identity from storage"
2130 );
2131 Ok((persisted.identity, persisted.document, did_method))
2132 } else {
2133 let (identity, document) = did_method.create(&*key_custody).await?;
2135 let persisted = PersistedIdentity {
2136 identity: identity.clone(),
2137 document: document.clone(),
2138 };
2139 let envelope = StoredValue {
2140 version: CURRENT_STORE_VERSION,
2141 data: &persisted,
2142 };
2143 let bytes = rmp_serde::to_vec_named(&envelope).map_err(|e| {
2144 NodeError::Storage(format!("failed to serialize identity for persistence: {e}"))
2145 })?;
2146 storage
2147 .store(IDENTITY_STORAGE_KEY, &bytes)
2148 .await
2149 .map_err(|e| {
2150 NodeError::Storage(format!("failed to persist identity to storage: {e}"))
2151 })?;
2152 tracing::info!(
2153 did = %identity.did,
2154 "created and persisted new identity to storage"
2155 );
2156 Ok((identity, document, did_method))
2157 }
2158 }
2159 IdentitySource::Explicit(e) => Ok((e.identity, e.document, e.did_method)),
2161 }
2162}
2163
2164fn generate_bridge_secret() -> Zeroizing<[u8; 32]> {
2168 let mut bytes = [0u8; 32];
2169 rand::RngCore::fill_bytes(&mut rand::rngs::OsRng, &mut bytes);
2170 Zeroizing::new(bytes)
2171}
2172
2173fn generate_dev_token(addr: SocketAddr) -> String {
2182 use rand::RngCore;
2183 let mut bytes = [0u8; 16];
2184 rand::rngs::OsRng.fill_bytes(&mut bytes);
2185 let hex = hex::encode(bytes);
2186 let token = format!("scp_local_token_{hex}");
2187 let masked = &token[..("scp_local_token_".len() + 8)];
2188 tracing::info!(
2189 token_prefix = %masked,
2190 dev_bind_addr = ?addr,
2191 "dev API token generated (use node.dev_token() for full value)"
2192 );
2193 token
2194}
2195
2196fn resolve_tls<S: Storage + 'static>(
2203 provider: Option<Arc<dyn TlsProvider>>,
2204 domain: &str,
2205 storage: &Arc<ProtocolStore<S>>,
2206 acme_email: Option<&String>,
2207) -> Arc<dyn TlsProvider> {
2208 provider.unwrap_or_else(|| {
2209 let mut acme = tls::AcmeProvider::new(domain, Arc::clone(storage));
2210 if let Some(email) = acme_email {
2211 acme = acme.with_email(email);
2212 }
2213 Arc::new(acme)
2214 })
2215}
2216
2217struct AcmeChallengeListener {
2228 shutdown: CancellationToken,
2230 task: tokio::task::JoinHandle<Result<(), NodeError>>,
2232}
2233
2234impl AcmeChallengeListener {
2235 async fn stop(self) {
2237 self.shutdown.cancel();
2238 let _ = self.task.await;
2239 tracing::info!("temporary ACME HTTP-01 challenge listener stopped");
2240 }
2241}
2242
2243async fn start_acme_challenge_listener(
2254 challenges: Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>>,
2255) -> Result<AcmeChallengeListener, NodeError> {
2256 let router = tls::acme_challenge_router(challenges);
2257 let shutdown = CancellationToken::new();
2258 let listener = tokio::net::TcpListener::bind("0.0.0.0:80")
2259 .await
2260 .map_err(|e| {
2261 NodeError::Serve(format!(
2262 "failed to bind temporary ACME challenge listener on port 80: {e}"
2263 ))
2264 })?;
2265 let local_addr = listener
2266 .local_addr()
2267 .map_err(|e| NodeError::Serve(e.to_string()))?;
2268 tracing::info!(
2269 addr = %local_addr,
2270 "temporary ACME HTTP-01 challenge listener started"
2271 );
2272 let shutdown_clone = shutdown.clone();
2273 let task = tokio::spawn(async move {
2274 axum::serve(listener, router)
2275 .with_graceful_shutdown(shutdown_clone.cancelled_owned())
2276 .await
2277 .map_err(|e| NodeError::Serve(format!("ACME challenge listener error: {e}")))
2278 });
2279 Ok(AcmeChallengeListener { shutdown, task })
2280}
2281
2282async fn provision_with_challenge_listener(
2295 provider: &dyn TlsProvider,
2296) -> Result<
2297 (
2298 Result<tls::CertificateData, tls::TlsError>,
2299 Option<Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>>>,
2300 ),
2301 NodeError,
2302> {
2303 let challenges = provider.challenges();
2304 let acme_listener = if provider.needs_challenge_listener() {
2305 Some(start_acme_challenge_listener(Arc::clone(&challenges)).await?)
2306 } else {
2307 None
2308 };
2309
2310 let result = provider.provision().await;
2311
2312 if let Some(listener) = acme_listener {
2313 listener.stop().await;
2314 }
2315
2316 let acme_challenges = if provider.needs_challenge_listener() {
2317 Some(challenges)
2318 } else {
2319 None
2320 };
2321
2322 Ok((result, acme_challenges))
2323}
2324
2325fn resolve_nat(
2332 strategy: Option<Arc<dyn NatStrategy>>,
2333 stun_server: Option<String>,
2334 bridge_relay: Option<String>,
2335 port_mapper: Option<Arc<dyn scp_transport::nat::PortMapper>>,
2336 reachability_probe: Option<Arc<dyn scp_transport::nat::ReachabilityProbe>>,
2337) -> Arc<dyn NatStrategy> {
2338 strategy.unwrap_or_else(|| {
2339 let mut default = DefaultNatStrategy::new(stun_server, bridge_relay);
2340 if let Some(mapper) = port_mapper {
2341 default = default.with_port_mapper(mapper);
2342 }
2343 if let Some(probe) = reachability_probe {
2344 default = default.with_reachability_probe(probe);
2345 }
2346 Arc::new(default)
2347 })
2348}
2349
2350#[allow(clippy::too_many_arguments)]
2355async fn build_domain_inner<D: DidMethod + 'static, S: Storage + 'static>(
2356 domain: String,
2357 identity: ScpIdentity,
2358 mut document: DidDocument,
2359 did_method: Arc<D>,
2360 storage: Arc<ProtocolStore<S>>,
2361 shutdown_handle: ShutdownHandle,
2362 bound_addr: SocketAddr,
2363 bridge_secret: Zeroizing<[u8; 32]>,
2364 dev_token: Option<String>,
2365 dev_bind_addr: Option<SocketAddr>,
2366 blob_storage: Arc<BlobStorageBackend>,
2367 relay_config: RelayConfig,
2368 http_bind_addr: SocketAddr,
2369 cors_origins: Option<Vec<String>>,
2370 projection_rate_limit: u32,
2371 cert_data: tls::CertificateData,
2372 connection_tracker: scp_transport::relay::rate_limit::ConnectionTracker,
2373 subscription_registry: scp_transport::relay::subscription::SubscriptionRegistry,
2374 acme_challenges: Option<Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>>>,
2375 #[cfg(feature = "http3")] http3_config: Option<scp_transport::http3::Http3Config>,
2376) -> Result<ApplicationNode<S>, NodeError> {
2377 let relay_url = format!("wss://{domain}/scp/v1");
2378 document.add_relay_service(&relay_url)?;
2379 did_method.publish(&identity, &document).await?;
2380
2381 let (tls_server_config, cert_resolver) =
2385 tls::build_reloadable_tls_config(&cert_data).map_err(NodeError::Tls)?;
2386
2387 tracing::info!(
2388 domain = %domain, relay_url = %relay_url,
2389 bound_addr = %bound_addr, did = %identity.did,
2390 "application node started (domain mode, TLS active)"
2391 );
2392
2393 let state = Arc::new(http::NodeState {
2394 did: identity.did.clone(),
2395 relay_url,
2396 broadcast_contexts: tokio::sync::RwLock::new(HashMap::new()),
2397 relay_addr: bound_addr,
2398 bridge_secret,
2399 dev_token,
2400 dev_bind_addr,
2401 projected_contexts: tokio::sync::RwLock::new(HashMap::new()),
2402 blob_storage,
2403 relay_config,
2404 start_time: std::time::Instant::now(),
2405 http_bind_addr,
2406 shutdown_token: CancellationToken::new(),
2407 cors_origins,
2408 projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
2409 projection_rate_limit,
2410 ),
2411 tls_config: Some(Arc::new(tls_server_config)),
2412 cert_resolver: Some(cert_resolver),
2413 did_document: document.clone(),
2414 connection_tracker,
2415 subscription_registry,
2416 acme_challenges,
2417 bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
2418 });
2419
2420 Ok(ApplicationNode {
2421 domain: Some(domain),
2422 relay: RelayHandle {
2423 bound_addr,
2424 shutdown_handle,
2425 },
2426 identity: IdentityHandle { identity, document },
2427 storage,
2428 state,
2429 tier_reeval: None,
2430 tier_change_rx: None,
2431 #[cfg(feature = "http3")]
2432 http3_config,
2433 })
2434}
2435
2436#[allow(clippy::too_many_arguments)]
2442async fn build_no_domain_inner<D: DidMethod + 'static, S: Storage + 'static>(
2443 identity: ScpIdentity,
2444 mut document: DidDocument,
2445 did_method: Arc<D>,
2446 storage: Arc<ProtocolStore<S>>,
2447 shutdown_handle: ShutdownHandle,
2448 bound_addr: SocketAddr,
2449 nat_strategy: Arc<dyn NatStrategy>,
2450 bridge_secret: Zeroizing<[u8; 32]>,
2451 dev_token: Option<String>,
2452 dev_bind_addr: Option<SocketAddr>,
2453 blob_storage: Arc<BlobStorageBackend>,
2454 relay_config: RelayConfig,
2455 http_bind_addr: Option<SocketAddr>,
2456 cors_origins: Option<Vec<String>>,
2457 projection_rate_limit: u32,
2458 network_detector: Option<Arc<dyn NetworkChangeDetector>>,
2459 connection_tracker: scp_transport::relay::rate_limit::ConnectionTracker,
2460 subscription_registry: scp_transport::relay::subscription::SubscriptionRegistry,
2461) -> Result<ApplicationNode<S>, NodeError> {
2462 let http_bind_addr = http_bind_addr.unwrap_or(DEFAULT_HTTP_BIND_ADDR);
2467
2468 let tier = nat_strategy.select_tier(http_bind_addr.port()).await?;
2469
2470 let relay_url = match &tier {
2471 ReachabilityTier::Upnp { external_addr } | ReachabilityTier::Stun { external_addr } => {
2472 format!("ws://{external_addr}/scp/v1")
2473 }
2474 ReachabilityTier::Bridge { bridge_url } => bridge_url.clone(),
2475 };
2476
2477 let relay_count = document
2478 .service
2479 .iter()
2480 .filter(|s| s.service_type == "SCPRelay")
2481 .count();
2482
2483 document.service.push(scp_identity::document::Service {
2484 id: format!("{}#scp-relay-{}", document.id, relay_count + 1),
2485 service_type: "SCPRelay".to_owned(),
2486 service_endpoint: relay_url.clone(),
2487 });
2488
2489 did_method.publish(&identity, &document).await?;
2491
2492 tracing::info!(
2493 tier = ?tier,
2494 relay_url = %relay_url,
2495 bound_addr = %bound_addr,
2496 did = %identity.did,
2497 "application node started (no-domain mode, §10.12.8)"
2498 );
2499
2500 let publisher: Arc<dyn DidPublisher> = Arc::new(DidMethodPublisher {
2502 inner: Arc::clone(&did_method),
2503 });
2504 let (tier_event_tx, tier_event_rx) = tokio::sync::mpsc::channel(16);
2505 let bg_identity = ScpIdentity {
2508 identity_key: identity.identity_key,
2509 active_signing_key: identity.active_signing_key,
2510 agent_signing_key: identity.agent_signing_key,
2511 pre_rotation_commitment: identity.pre_rotation_commitment,
2512 did: identity.did.clone(),
2513 };
2514 let tier_reeval = spawn_tier_reevaluation(
2515 nat_strategy,
2516 network_detector,
2517 publisher,
2518 bg_identity,
2519 document.clone(),
2520 http_bind_addr.port(),
2521 relay_url.clone(),
2522 Some(tier_event_tx),
2523 TIER_REEVALUATION_INTERVAL,
2524 );
2525
2526 let state = Arc::new(http::NodeState {
2527 did: identity.did.clone(),
2528 relay_url,
2529 broadcast_contexts: tokio::sync::RwLock::new(HashMap::new()),
2530 relay_addr: bound_addr,
2531 bridge_secret,
2532 dev_token,
2533 dev_bind_addr,
2534 projected_contexts: tokio::sync::RwLock::new(HashMap::new()),
2535 blob_storage,
2536 relay_config,
2537 start_time: std::time::Instant::now(),
2538 http_bind_addr,
2539 shutdown_token: CancellationToken::new(),
2540 cors_origins,
2541 projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
2542 projection_rate_limit,
2543 ),
2544 tls_config: None,
2545 cert_resolver: None,
2546 did_document: document.clone(),
2547 connection_tracker,
2548 subscription_registry,
2549 acme_challenges: None,
2550 bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
2551 });
2552
2553 Ok(ApplicationNode {
2555 domain: None,
2556 relay: RelayHandle {
2557 bound_addr,
2558 shutdown_handle,
2559 },
2560 identity: IdentityHandle { identity, document },
2561 storage,
2562 state,
2563 tier_reeval: Some(tier_reeval),
2564 tier_change_rx: Some(tier_event_rx),
2565 #[cfg(feature = "http3")]
2567 http3_config: None,
2568 })
2569}
2570
2571impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: EncryptedStorage + 'static>
2576 ApplicationNodeBuilder<K, D, S, HasNoDomain, HasIdentity>
2577{
2578 pub async fn build(mut self) -> Result<ApplicationNode<S>, NodeError> {
2587 let storage = self
2588 .storage
2589 .take()
2590 .ok_or(NodeError::MissingField("storage"))?;
2591 let protocol_store = Arc::new(ProtocolStore::new(storage));
2592 self.build_with_store(protocol_store).await
2593 }
2594}
2595
2596#[cfg(any(test, feature = "allow_unencrypted_storage"))]
2598impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static>
2599 ApplicationNodeBuilder<K, D, S, HasNoDomain, HasIdentity>
2600{
2601 pub async fn build_for_testing(mut self) -> Result<ApplicationNode<S>, NodeError> {
2608 let storage = self
2609 .storage
2610 .take()
2611 .ok_or(NodeError::MissingField("storage"))?;
2612 let protocol_store = Arc::new(ProtocolStore::new_for_testing(storage));
2613 self.build_with_store(protocol_store).await
2614 }
2615}
2616
2617impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static>
2618 ApplicationNodeBuilder<K, D, S, HasNoDomain, HasIdentity>
2619{
2620 async fn build_with_store(
2622 self,
2623 protocol_store: Arc<ProtocolStore<S>>,
2624 ) -> Result<ApplicationNode<S>, NodeError> {
2625 let identity_source = self
2626 .identity_source
2627 .ok_or(NodeError::MissingField("identity"))?;
2628 let persist = self.persist_identity;
2629
2630 let (identity, document, did_method) =
2631 resolve_identity_persistent(identity_source, persist, protocol_store.storage()).await?;
2632
2633 let bind_addr = self
2635 .bind_addr
2636 .unwrap_or_else(|| SocketAddr::from(([127, 0, 0, 1], 0)));
2637 let bridge_secret = generate_bridge_secret();
2638 let relay_config = RelayConfig {
2639 bind_addr,
2640 bridge_secret: Some(*bridge_secret),
2641 ..RelayConfig::default()
2642 };
2643
2644 let blob_storage = Arc::new(
2645 self.blob_storage
2646 .ok_or(NodeError::MissingField("blob_storage"))?,
2647 );
2648 let relay_server = RelayServer::new(relay_config.clone(), Arc::clone(&blob_storage));
2649 let connection_tracker = relay_server.connection_tracker();
2650 let subscription_registry = relay_server.subscriptions();
2651 let (shutdown_handle, bound_addr) = relay_server.start().await?;
2652
2653 let dev_token = self.local_api_addr.map(generate_dev_token);
2655
2656 let strategy = resolve_nat(
2658 self.nat_strategy,
2659 self.stun_server,
2660 self.bridge_relay,
2661 self.port_mapper,
2662 self.reachability_probe,
2663 );
2664
2665 build_no_domain_inner(
2666 identity,
2667 document,
2668 did_method,
2669 protocol_store,
2670 shutdown_handle,
2671 bound_addr,
2672 strategy,
2673 bridge_secret,
2674 dev_token,
2675 self.local_api_addr,
2676 blob_storage,
2677 relay_config,
2678 self.http_bind_addr,
2679 self.cors_origins,
2680 self.projection_rate_limit
2681 .unwrap_or(DEFAULT_PROJECTION_RATE_LIMIT),
2682 self.network_detector,
2683 connection_tracker,
2684 subscription_registry,
2685 )
2686 .await
2687 }
2688}
2689
2690#[doc(hidden)]
2699pub struct NoOpCustody;
2700
2701impl KeyCustody for NoOpCustody {
2702 fn generate_keypair(
2703 &self,
2704 _key_type: scp_platform::KeyType,
2705 ) -> impl std::future::Future<
2706 Output = Result<scp_platform::KeyHandle, scp_platform::PlatformError>,
2707 > + Send {
2708 std::future::ready(Err(scp_platform::PlatformError::StorageError(
2709 "NoOpCustody: not configured".to_owned(),
2710 )))
2711 }
2712
2713 fn public_key(
2714 &self,
2715 _handle: &scp_platform::KeyHandle,
2716 ) -> impl std::future::Future<
2717 Output = Result<scp_platform::PublicKey, scp_platform::PlatformError>,
2718 > + Send {
2719 std::future::ready(Err(scp_platform::PlatformError::StorageError(
2720 "NoOpCustody: not configured".to_owned(),
2721 )))
2722 }
2723
2724 fn sign(
2725 &self,
2726 _handle: &scp_platform::KeyHandle,
2727 _data: &[u8],
2728 ) -> impl std::future::Future<
2729 Output = Result<scp_platform::Signature, scp_platform::PlatformError>,
2730 > + Send {
2731 std::future::ready(Err(scp_platform::PlatformError::StorageError(
2732 "NoOpCustody: not configured".to_owned(),
2733 )))
2734 }
2735
2736 fn destroy_key(
2737 &self,
2738 _handle: &scp_platform::KeyHandle,
2739 ) -> impl std::future::Future<Output = Result<(), scp_platform::PlatformError>> + Send {
2740 std::future::ready(Err(scp_platform::PlatformError::StorageError(
2741 "NoOpCustody: not configured".to_owned(),
2742 )))
2743 }
2744
2745 fn dh_agree(
2746 &self,
2747 _handle: &scp_platform::KeyHandle,
2748 _peer_public: &[u8; 32],
2749 ) -> impl std::future::Future<
2750 Output = Result<scp_platform::SharedSecret, scp_platform::PlatformError>,
2751 > + Send {
2752 std::future::ready(Err(scp_platform::PlatformError::StorageError(
2753 "NoOpCustody: not configured".to_owned(),
2754 )))
2755 }
2756
2757 fn derive_pseudonym(
2758 &self,
2759 _handle: &scp_platform::KeyHandle,
2760 _context_id: &[u8],
2761 ) -> impl std::future::Future<
2762 Output = Result<scp_platform::PseudonymKeypair, scp_platform::PlatformError>,
2763 > + Send {
2764 std::future::ready(Err(scp_platform::PlatformError::StorageError(
2765 "NoOpCustody: not configured".to_owned(),
2766 )))
2767 }
2768
2769 fn derive_rotatable_pseudonym(
2770 &self,
2771 _handle: &scp_platform::KeyHandle,
2772 _context_id: &[u8],
2773 _pseudonym_epoch: u64,
2774 ) -> impl std::future::Future<
2775 Output = Result<scp_platform::PseudonymKeypair, scp_platform::PlatformError>,
2776 > + Send {
2777 std::future::ready(Err(scp_platform::PlatformError::StorageError(
2778 "NoOpCustody: not configured".to_owned(),
2779 )))
2780 }
2781
2782 fn custody_type(&self, _handle: &scp_platform::KeyHandle) -> scp_platform::CustodyType {
2783 scp_platform::CustodyType::InMemory
2784 }
2785}
2786
2787#[doc(hidden)]
2791pub struct NoOpDidMethod;
2792
2793impl DidMethod for NoOpDidMethod {
2794 fn create(
2795 &self,
2796 _key_custody: &impl KeyCustody,
2797 ) -> impl std::future::Future<Output = Result<(ScpIdentity, DidDocument), IdentityError>> + Send
2798 {
2799 std::future::ready(Err(IdentityError::DhtPublishFailed(
2800 "NoOpDidMethod: not configured".to_owned(),
2801 )))
2802 }
2803
2804 fn verify(&self, _did_string: &str, _public_key: &[u8]) -> bool {
2805 false
2806 }
2807
2808 fn publish(
2809 &self,
2810 _identity: &ScpIdentity,
2811 _document: &DidDocument,
2812 ) -> impl std::future::Future<Output = Result<(), IdentityError>> + Send {
2813 std::future::ready(Err(IdentityError::DhtPublishFailed(
2814 "NoOpDidMethod: not configured".to_owned(),
2815 )))
2816 }
2817
2818 fn resolve(
2819 &self,
2820 _did_string: &str,
2821 ) -> impl std::future::Future<Output = Result<DidDocument, IdentityError>> + Send {
2822 std::future::ready(Err(IdentityError::DhtResolveFailed(
2823 "NoOpDidMethod: not configured".to_owned(),
2824 )))
2825 }
2826
2827 fn rotate(
2828 &self,
2829 _identity: &ScpIdentity,
2830 _key_custody: &impl KeyCustody,
2831 ) -> impl std::future::Future<Output = Result<(ScpIdentity, DidDocument), IdentityError>> + Send
2832 {
2833 std::future::ready(Err(IdentityError::KeyRotationFailed(
2834 "NoOpDidMethod: not configured".to_owned(),
2835 )))
2836 }
2837}
2838
2839#[doc(hidden)]
2842#[derive(Debug, Default)]
2843pub struct NoOpStorage;
2844
2845impl Storage for NoOpStorage {
2846 fn store(
2847 &self,
2848 _key: &str,
2849 _data: &[u8],
2850 ) -> impl std::future::Future<Output = Result<(), scp_platform::PlatformError>> + Send {
2851 std::future::ready(Ok(()))
2852 }
2853
2854 fn retrieve(
2855 &self,
2856 _key: &str,
2857 ) -> impl std::future::Future<Output = Result<Option<Vec<u8>>, scp_platform::PlatformError>> + Send
2858 {
2859 std::future::ready(Ok(None))
2860 }
2861
2862 fn delete(
2863 &self,
2864 _key: &str,
2865 ) -> impl std::future::Future<Output = Result<(), scp_platform::PlatformError>> + Send {
2866 std::future::ready(Ok(()))
2867 }
2868
2869 fn list_keys(
2870 &self,
2871 _prefix: &str,
2872 ) -> impl std::future::Future<Output = Result<Vec<String>, scp_platform::PlatformError>> + Send
2873 {
2874 std::future::ready(Ok(Vec::new()))
2875 }
2876
2877 fn delete_prefix(
2878 &self,
2879 _prefix: &str,
2880 ) -> impl std::future::Future<Output = Result<u64, scp_platform::PlatformError>> + Send {
2881 std::future::ready(Ok(0))
2882 }
2883
2884 fn exists(
2885 &self,
2886 _key: &str,
2887 ) -> impl std::future::Future<Output = Result<bool, scp_platform::PlatformError>> + Send {
2888 std::future::ready(Ok(false))
2889 }
2890}
2891
2892#[cfg(test)]
2897#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2898mod tests {
2899 use super::*;
2900 use std::sync::Arc;
2901
2902 use scp_identity::DidCache;
2903 use scp_identity::cache::SystemClock;
2904 use scp_identity::dht::DidDht;
2905 use scp_identity::dht_client::InMemoryDhtClient;
2906 use scp_platform::testing::{InMemoryKeyCustody, InMemoryStorage};
2907
2908 type TestDidDht = DidDht<InMemoryDhtClient, SystemClock>;
2910
2911 fn make_test_dht(custody: &Arc<InMemoryKeyCustody>) -> TestDidDht {
2913 let dht_client = Arc::new(InMemoryDhtClient::new());
2914 let cache = Arc::new(DidCache::new());
2915 let sign_fn = TestDidDht::make_sign_fn(Arc::clone(custody));
2916 DidDht::with_client_and_signer(dht_client, cache, sign_fn)
2917 }
2918
2919 struct SucceedingTlsProvider {
2921 domain: String,
2922 }
2923
2924 impl TlsProvider for SucceedingTlsProvider {
2925 fn provision(
2926 &self,
2927 ) -> std::pin::Pin<
2928 Box<
2929 dyn std::future::Future<Output = Result<tls::CertificateData, tls::TlsError>>
2930 + Send
2931 + '_,
2932 >,
2933 > {
2934 let domain = self.domain.clone();
2935 Box::pin(async move { tls::generate_self_signed(&domain) })
2936 }
2937 }
2938
2939 struct FailingTlsProvider;
2941
2942 impl TlsProvider for FailingTlsProvider {
2943 fn provision(
2944 &self,
2945 ) -> std::pin::Pin<
2946 Box<
2947 dyn std::future::Future<Output = Result<tls::CertificateData, tls::TlsError>>
2948 + Send
2949 + '_,
2950 >,
2951 > {
2952 Box::pin(async {
2953 Err(tls::TlsError::Acme(
2954 "ACME challenge failed (mock)".to_owned(),
2955 ))
2956 })
2957 }
2958 }
2959
2960 fn test_builder() -> ApplicationNodeBuilder<
2965 InMemoryKeyCustody,
2966 TestDidDht,
2967 InMemoryStorage,
2968 HasDomain,
2969 HasIdentity,
2970 > {
2971 let custody = Arc::new(InMemoryKeyCustody::new());
2972 let did_method = Arc::new(make_test_dht(&custody));
2973 ApplicationNodeBuilder::new()
2974 .storage(InMemoryStorage::new())
2975 .domain("test.example.com")
2976 .tls_provider(Arc::new(SucceedingTlsProvider {
2977 domain: "test.example.com".to_owned(),
2978 }))
2979 .generate_identity_with(custody, did_method)
2980 }
2981
2982 async fn create_test_identity() -> (ScpIdentity, DidDocument, Arc<InMemoryKeyCustody>) {
2984 let custody = Arc::new(InMemoryKeyCustody::new());
2985 let did_dht = make_test_dht(&custody);
2986 let (identity, document) = did_dht.create(&*custody).await.unwrap();
2987 (identity, document, custody)
2988 }
2989
2990 #[tokio::test]
3007 async fn type_state_builder_compiles_with_all_required_fields() {
3008 let custody = Arc::new(InMemoryKeyCustody::new());
3009 let did_method = Arc::new(make_test_dht(&custody));
3010
3011 let _builder = ApplicationNodeBuilder::new()
3013 .domain("test.example.com")
3014 .generate_identity_with(custody, did_method);
3015
3016 }
3020
3021 #[test]
3022 fn type_state_optional_fields_at_any_point() {
3023 let _builder = ApplicationNodeBuilder::new()
3026 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3027 .acme_email("test@example.com");
3028
3029 let custody = Arc::new(InMemoryKeyCustody::new());
3031 let did_method = Arc::new(make_test_dht(&custody));
3032 let _builder = ApplicationNodeBuilder::new()
3033 .domain("test.example.com")
3034 .generate_identity_with(custody, did_method)
3035 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3036 .acme_email("test@example.com");
3037 }
3038
3039 #[tokio::test]
3040 async fn build_with_generate_identity_creates_new_did() {
3041 let node = test_builder()
3042 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3043 .build_for_testing()
3044 .await
3045 .unwrap();
3046
3047 assert!(
3049 node.identity().did().starts_with("did:dht:"),
3050 "DID should start with did:dht:, got: {}",
3051 node.identity().did()
3052 );
3053
3054 let relay_urls = node.identity().document().relay_service_urls();
3056 assert_eq!(relay_urls.len(), 1);
3057 assert_eq!(relay_urls[0], "wss://test.example.com/scp/v1");
3058
3059 assert_eq!(node.domain(), Some("test.example.com"));
3061 assert_eq!(node.relay_url(), "wss://test.example.com/scp/v1");
3062
3063 let addr = node.relay().bound_addr();
3065 assert_ne!(addr.port(), 0, "relay should be bound to a real port");
3066 }
3067
3068 #[tokio::test]
3069 async fn build_with_explicit_identity_uses_provided_identity() {
3070 let (identity, document, custody) = create_test_identity().await;
3071 let original_did = identity.did.clone();
3072 let did_method = Arc::new(make_test_dht(&custody));
3073
3074 let node = ApplicationNodeBuilder::new()
3075 .storage(InMemoryStorage::new())
3076 .domain("explicit.example.com")
3077 .tls_provider(Arc::new(SucceedingTlsProvider {
3078 domain: "explicit.example.com".to_owned(),
3079 }))
3080 .identity(identity, document, did_method)
3081 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3082 .build_for_testing()
3083 .await
3084 .unwrap();
3085
3086 assert_eq!(node.identity().did(), original_did);
3088
3089 let relay_urls = node.identity().document().relay_service_urls();
3091 assert!(
3092 relay_urls.contains(&"wss://explicit.example.com/scp/v1".to_owned()),
3093 "expected relay URL in document, got: {relay_urls:?}"
3094 );
3095 }
3096
3097 #[tokio::test]
3098 async fn did_publication_happens_once_on_build() {
3099 use std::sync::atomic::{AtomicU32, Ordering};
3100
3101 struct CountingDidMethod {
3103 inner: TestDidDht,
3104 publish_count: Arc<AtomicU32>,
3105 }
3106
3107 impl DidMethod for CountingDidMethod {
3108 fn create(
3109 &self,
3110 key_custody: &impl KeyCustody,
3111 ) -> impl std::future::Future<
3112 Output = Result<(ScpIdentity, DidDocument), IdentityError>,
3113 > + Send {
3114 self.inner.create(key_custody)
3115 }
3116
3117 fn verify(&self, did_string: &str, public_key: &[u8]) -> bool {
3118 self.inner.verify(did_string, public_key)
3119 }
3120
3121 fn publish(
3122 &self,
3123 identity: &ScpIdentity,
3124 document: &DidDocument,
3125 ) -> impl std::future::Future<Output = Result<(), IdentityError>> + Send {
3126 self.publish_count.fetch_add(1, Ordering::SeqCst);
3127 self.inner.publish(identity, document)
3128 }
3129
3130 fn resolve(
3131 &self,
3132 did_string: &str,
3133 ) -> impl std::future::Future<Output = Result<DidDocument, IdentityError>> + Send
3134 {
3135 self.inner.resolve(did_string)
3136 }
3137
3138 fn rotate(
3139 &self,
3140 identity: &ScpIdentity,
3141 key_custody: &impl KeyCustody,
3142 ) -> impl std::future::Future<
3143 Output = Result<(ScpIdentity, DidDocument), IdentityError>,
3144 > + Send {
3145 self.inner.rotate(identity, key_custody)
3146 }
3147 }
3148
3149 let custody = Arc::new(InMemoryKeyCustody::new());
3150 let publish_count = Arc::new(AtomicU32::new(0));
3151 let counting_method = Arc::new(CountingDidMethod {
3152 inner: make_test_dht(&custody),
3153 publish_count: Arc::clone(&publish_count),
3154 });
3155
3156 let _node = ApplicationNodeBuilder::new()
3157 .storage(InMemoryStorage::new())
3158 .domain("counting.example.com")
3159 .tls_provider(Arc::new(SucceedingTlsProvider {
3160 domain: "counting.example.com".to_owned(),
3161 }))
3162 .generate_identity_with(custody, counting_method)
3163 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3164 .build_for_testing()
3165 .await
3166 .unwrap();
3167
3168 assert_eq!(
3170 publish_count.load(Ordering::SeqCst),
3171 1,
3172 "DID should be published exactly once on build"
3173 );
3174 }
3175
3176 #[tokio::test]
3177 async fn relay_accepts_connections_with_valid_bridge_token() {
3178 use tokio_tungstenite::tungstenite::client::IntoClientRequest;
3179
3180 let node = test_builder()
3181 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3182 .build_for_testing()
3183 .await
3184 .unwrap();
3185
3186 let addr = node.relay().bound_addr();
3187 let token = node.bridge_token_hex();
3188
3189 let url = format!("ws://{addr}/");
3191 let mut request = url.into_client_request().unwrap();
3192 request
3193 .headers_mut()
3194 .insert("Authorization", format!("Bearer {token}").parse().unwrap());
3195 let connect_result = tokio_tungstenite::connect_async(request).await;
3196
3197 assert!(
3198 connect_result.is_ok(),
3199 "relay should accept connections with valid bridge token, got error: {:?}",
3200 connect_result.err()
3201 );
3202 }
3203
3204 #[tokio::test]
3205 async fn relay_rejects_connections_without_bridge_token() {
3206 let node = test_builder()
3207 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3208 .build_for_testing()
3209 .await
3210 .unwrap();
3211
3212 let addr = node.relay().bound_addr();
3213
3214 let url = format!("ws://{addr}/");
3216 let connect_result = tokio_tungstenite::connect_async(&url).await;
3217
3218 assert!(
3219 connect_result.is_err(),
3220 "relay should reject connections without bridge token"
3221 );
3222 }
3223
3224 #[tokio::test]
3225 async fn relay_listening_before_did_publish() {
3226 use std::sync::atomic::{AtomicBool, Ordering};
3227
3228 struct RelayCheckDidMethod {
3231 inner: TestDidDht,
3232 relay_was_listening_at_publish: Arc<AtomicBool>,
3233 bind_addr: SocketAddr,
3234 }
3235
3236 impl DidMethod for RelayCheckDidMethod {
3237 fn create(
3238 &self,
3239 key_custody: &impl KeyCustody,
3240 ) -> impl std::future::Future<
3241 Output = Result<(ScpIdentity, DidDocument), IdentityError>,
3242 > + Send {
3243 self.inner.create(key_custody)
3244 }
3245
3246 fn verify(&self, did_string: &str, public_key: &[u8]) -> bool {
3247 self.inner.verify(did_string, public_key)
3248 }
3249
3250 fn publish(
3251 &self,
3252 identity: &ScpIdentity,
3253 document: &DidDocument,
3254 ) -> impl std::future::Future<Output = Result<(), IdentityError>> + Send {
3255 let addr = self.bind_addr;
3257 let flag = Arc::clone(&self.relay_was_listening_at_publish);
3258 let inner = &self.inner;
3259 async move {
3260 if tokio::net::TcpStream::connect(addr).await.is_ok() {
3262 flag.store(true, Ordering::SeqCst);
3263 }
3264 inner.publish(identity, document).await
3265 }
3266 }
3267
3268 fn resolve(
3269 &self,
3270 did_string: &str,
3271 ) -> impl std::future::Future<Output = Result<DidDocument, IdentityError>> + Send
3272 {
3273 self.inner.resolve(did_string)
3274 }
3275
3276 fn rotate(
3277 &self,
3278 identity: &ScpIdentity,
3279 key_custody: &impl KeyCustody,
3280 ) -> impl std::future::Future<
3281 Output = Result<(ScpIdentity, DidDocument), IdentityError>,
3282 > + Send {
3283 self.inner.rotate(identity, key_custody)
3284 }
3285 }
3286
3287 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3292 let bind_addr = listener.local_addr().unwrap();
3293 drop(listener); let custody = Arc::new(InMemoryKeyCustody::new());
3296 let relay_was_listening = Arc::new(AtomicBool::new(false));
3297
3298 let check_method = Arc::new(RelayCheckDidMethod {
3299 inner: make_test_dht(&custody),
3300 relay_was_listening_at_publish: Arc::clone(&relay_was_listening),
3301 bind_addr,
3302 });
3303
3304 let _node = ApplicationNodeBuilder::new()
3305 .storage(InMemoryStorage::new())
3306 .domain("relay-order.example.com")
3307 .tls_provider(Arc::new(SucceedingTlsProvider {
3308 domain: "relay-order.example.com".to_owned(),
3309 }))
3310 .generate_identity_with(custody, check_method)
3311 .bind_addr(bind_addr)
3312 .build_for_testing()
3313 .await
3314 .unwrap();
3315
3316 assert!(
3317 relay_was_listening.load(Ordering::SeqCst),
3318 "relay must be listening BEFORE DID document is published"
3319 );
3320 }
3321
3322 #[tokio::test]
3323 async fn builder_domain_sets_relay_url() {
3324 let node = test_builder()
3325 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3326 .build_for_testing()
3327 .await
3328 .unwrap();
3329
3330 assert_eq!(node.relay_url(), "wss://test.example.com/scp/v1");
3331 }
3332
3333 #[tokio::test]
3334 async fn builder_with_custom_storage() {
3335 let custom_storage = InMemoryStorage::new();
3336 let custody = Arc::new(InMemoryKeyCustody::new());
3337 let did_method = Arc::new(make_test_dht(&custody));
3338
3339 let node = ApplicationNodeBuilder::new()
3340 .storage(custom_storage)
3341 .domain("storage.example.com")
3342 .tls_provider(Arc::new(SucceedingTlsProvider {
3343 domain: "storage.example.com".to_owned(),
3344 }))
3345 .generate_identity_with(custody, did_method)
3346 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3347 .build_for_testing()
3348 .await
3349 .unwrap();
3350
3351 let _storage = node.storage();
3353 }
3354
3355 #[tokio::test]
3356 async fn builder_with_acme_email() {
3357 let node = test_builder()
3359 .acme_email("admin@example.com")
3360 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3361 .build_for_testing()
3362 .await
3363 .unwrap();
3364
3365 assert!(
3366 node.identity().did().starts_with("did:dht:"),
3367 "node should build successfully with acme_email set"
3368 );
3369 }
3370
3371 struct MockNatStrategy {
3375 tier: ReachabilityTier,
3376 }
3377
3378 impl NatStrategy for MockNatStrategy {
3379 fn select_tier(
3380 &self,
3381 _relay_port: u16,
3382 ) -> std::pin::Pin<
3383 Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
3384 > {
3385 let tier = self.tier.clone();
3386 Box::pin(async move { Ok(tier) })
3387 }
3388 }
3389
3390 struct FailingNatStrategy;
3392
3393 impl NatStrategy for FailingNatStrategy {
3394 fn select_tier(
3395 &self,
3396 _relay_port: u16,
3397 ) -> std::pin::Pin<
3398 Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
3399 > {
3400 Box::pin(async { Err(NodeError::Nat("all tiers failed".into())) })
3401 }
3402 }
3403
3404 fn test_no_domain_builder(
3407 tier: ReachabilityTier,
3408 ) -> ApplicationNodeBuilder<
3409 InMemoryKeyCustody,
3410 TestDidDht,
3411 InMemoryStorage,
3412 HasNoDomain,
3413 HasIdentity,
3414 > {
3415 let custody = Arc::new(InMemoryKeyCustody::new());
3416 let did_method = Arc::new(make_test_dht(&custody));
3417 ApplicationNodeBuilder::new()
3418 .storage(InMemoryStorage::new())
3419 .no_domain()
3420 .nat_strategy(Arc::new(MockNatStrategy { tier }))
3421 .generate_identity_with(custody, did_method)
3422 }
3423
3424 #[test]
3425 fn no_domain_method_exists_and_transitions_type_state() {
3426 let custody = Arc::new(InMemoryKeyCustody::new());
3428 let did_method = Arc::new(make_test_dht(&custody));
3429
3430 let _builder = ApplicationNodeBuilder::new()
3431 .no_domain()
3432 .generate_identity_with(custody, did_method);
3433
3434 }
3436
3437 #[test]
3438 fn stun_server_method_exists_on_builder() {
3439 let _builder = ApplicationNodeBuilder::new().stun_server("stun.example.com:3478");
3441
3442 let custody = Arc::new(InMemoryKeyCustody::new());
3444 let did_method = Arc::new(make_test_dht(&custody));
3445 let _builder = ApplicationNodeBuilder::new()
3446 .stun_server("stun.example.com:3478")
3447 .no_domain()
3448 .generate_identity_with(custody, did_method);
3449 }
3450
3451 #[test]
3452 fn bridge_relay_method_exists_on_builder() {
3453 let _builder =
3455 ApplicationNodeBuilder::new().bridge_relay("wss://bridge.example.com/scp/v1");
3456 }
3457
3458 #[tokio::test]
3459 async fn no_domain_build_skips_tls_and_publishes_ws_url() {
3460 let external_addr = SocketAddr::from(([198, 51, 100, 7], 32891));
3462 let tier = ReachabilityTier::Stun { external_addr };
3463
3464 let node = test_no_domain_builder(tier)
3465 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3466 .build_for_testing()
3467 .await
3468 .unwrap();
3469
3470 assert!(
3472 node.domain().is_none(),
3473 "no-domain mode should have None domain"
3474 );
3475
3476 assert!(
3478 node.relay_url().starts_with("ws://"),
3479 "no-domain mode should publish ws:// URL, got: {}",
3480 node.relay_url()
3481 );
3482 assert_eq!(node.relay_url(), "ws://198.51.100.7:32891/scp/v1");
3483
3484 let relay_urls = node.identity().document().relay_service_urls();
3486 assert_eq!(relay_urls.len(), 1);
3487 assert_eq!(relay_urls[0], "ws://198.51.100.7:32891/scp/v1");
3488
3489 assert!(
3491 node.identity().did().starts_with("did:dht:"),
3492 "DID should start with did:dht:"
3493 );
3494
3495 assert_ne!(node.relay().bound_addr().port(), 0);
3497 }
3498
3499 #[tokio::test]
3500 async fn no_domain_build_with_bridge_publishes_wss_url() {
3501 let tier = ReachabilityTier::Bridge {
3503 bridge_url: "wss://bridge.example.com/scp/v1?bridge_target=deadbeef".to_owned(),
3504 };
3505
3506 let node = test_no_domain_builder(tier)
3507 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3508 .build_for_testing()
3509 .await
3510 .unwrap();
3511
3512 assert!(
3514 node.relay_url().starts_with("wss://"),
3515 "bridge mode should publish wss:// URL, got: {}",
3516 node.relay_url()
3517 );
3518 assert_eq!(
3519 node.relay_url(),
3520 "wss://bridge.example.com/scp/v1?bridge_target=deadbeef"
3521 );
3522
3523 let relay_urls = node.identity().document().relay_service_urls();
3525 assert_eq!(relay_urls.len(), 1);
3526 assert!(relay_urls[0].contains("bridge_target="));
3527 }
3528
3529 #[tokio::test]
3530 async fn no_domain_build_with_upnp_tier_publishes_ws_url() {
3531 let external_addr = SocketAddr::from(([203, 0, 113, 42], 8443));
3533 let tier = ReachabilityTier::Upnp { external_addr };
3534
3535 let node = test_no_domain_builder(tier)
3536 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3537 .build_for_testing()
3538 .await
3539 .unwrap();
3540
3541 assert_eq!(node.relay_url(), "ws://203.0.113.42:8443/scp/v1");
3542 }
3543
3544 #[tokio::test]
3545 async fn no_domain_does_not_serve_well_known() {
3546 let tier = ReachabilityTier::Stun {
3554 external_addr: SocketAddr::from(([198, 51, 100, 7], 32891)),
3555 };
3556
3557 let node = test_no_domain_builder(tier)
3558 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3559 .build_for_testing()
3560 .await
3561 .unwrap();
3562
3563 assert!(
3564 node.domain().is_none(),
3565 "no-domain mode: domain must be None to prevent .well-known/scp serving"
3566 );
3567 }
3568
3569 #[tokio::test]
3570 async fn domain_build_uses_wss_no_regression() {
3571 let node = test_builder()
3573 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3574 .build_for_testing()
3575 .await
3576 .unwrap();
3577
3578 assert!(
3579 node.relay_url().starts_with("wss://"),
3580 "domain mode should use wss://, got: {}",
3581 node.relay_url()
3582 );
3583 assert_eq!(node.relay_url(), "wss://test.example.com/scp/v1");
3584 assert_eq!(node.domain(), Some("test.example.com"));
3585 }
3586
3587 #[tokio::test]
3588 async fn domain_fallthrough_on_acme_failure_probes_nat() {
3589 use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
3593
3594 struct RecordingNatStrategy {
3596 called: Arc<AtomicBool>,
3597 received_port: Arc<AtomicU16>,
3598 tier: ReachabilityTier,
3599 }
3600
3601 impl NatStrategy for RecordingNatStrategy {
3602 fn select_tier(
3603 &self,
3604 relay_port: u16,
3605 ) -> std::pin::Pin<
3606 Box<
3607 dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>>
3608 + Send
3609 + '_,
3610 >,
3611 > {
3612 self.called.store(true, Ordering::SeqCst);
3613 self.received_port.store(relay_port, Ordering::SeqCst);
3614 let tier = self.tier.clone();
3615 Box::pin(async move { Ok(tier) })
3616 }
3617 }
3618
3619 let nat_called = Arc::new(AtomicBool::new(false));
3620 let nat_port = Arc::new(AtomicU16::new(0));
3621 let external_addr = SocketAddr::from(([198, 51, 100, 7], 32891));
3622
3623 let custody = Arc::new(InMemoryKeyCustody::new());
3624 let did_method = Arc::new(make_test_dht(&custody));
3625
3626 let node = ApplicationNodeBuilder::new()
3627 .storage(InMemoryStorage::new())
3628 .domain("fail.example.com")
3629 .tls_provider(Arc::new(FailingTlsProvider))
3630 .nat_strategy(Arc::new(RecordingNatStrategy {
3631 called: Arc::clone(&nat_called),
3632 received_port: Arc::clone(&nat_port),
3633 tier: ReachabilityTier::Stun { external_addr },
3634 }))
3635 .generate_identity_with(custody, did_method)
3636 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3637 .build_for_testing()
3638 .await
3639 .unwrap();
3640
3641 assert!(
3643 node.domain().is_none(),
3644 "domain should be None after TLS fallthrough"
3645 );
3646
3647 assert!(
3649 nat_called.load(Ordering::SeqCst),
3650 "NAT strategy should have been called on ACME failure fallthrough"
3651 );
3652
3653 assert_eq!(
3655 nat_port.load(Ordering::SeqCst),
3656 DEFAULT_HTTP_BIND_ADDR.port(),
3657 "NAT strategy should receive the HTTP port ({}), not the relay port",
3658 DEFAULT_HTTP_BIND_ADDR.port()
3659 );
3660
3661 assert!(
3663 node.relay_url().starts_with("ws://"),
3664 "fallthrough should use ws:// URL, got: {}",
3665 node.relay_url()
3666 );
3667 assert_eq!(node.relay_url(), "ws://198.51.100.7:32891/scp/v1");
3668
3669 assert_ne!(
3671 node.relay().bound_addr().port(),
3672 0,
3673 "relay should be bound to a real port after fallthrough"
3674 );
3675
3676 assert!(
3678 node.identity().did().starts_with("did:dht:"),
3679 "DID should start with did:dht:"
3680 );
3681 }
3682
3683 #[tokio::test]
3684 async fn no_domain_nat_failure_returns_error() {
3685 let custody = Arc::new(InMemoryKeyCustody::new());
3687 let did_method = Arc::new(make_test_dht(&custody));
3688
3689 let result = ApplicationNodeBuilder::new()
3690 .storage(InMemoryStorage::new())
3691 .no_domain()
3692 .nat_strategy(Arc::new(FailingNatStrategy))
3693 .generate_identity_with(custody, did_method)
3694 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3695 .build_for_testing()
3696 .await;
3697
3698 let Err(err) = result else {
3699 panic!("build() should fail when all NAT tiers fail");
3700 };
3701 assert!(
3702 matches!(err, NodeError::Nat(_)),
3703 "error should be NodeError::Nat, got: {err:?}"
3704 );
3705 }
3706
3707 #[tokio::test]
3708 async fn no_domain_did_publication_happens_once() {
3709 use std::sync::atomic::{AtomicU32, Ordering};
3710
3711 struct CountingDidMethod {
3712 inner: TestDidDht,
3713 publish_count: Arc<AtomicU32>,
3714 }
3715
3716 impl DidMethod for CountingDidMethod {
3717 fn create(
3718 &self,
3719 key_custody: &impl KeyCustody,
3720 ) -> impl std::future::Future<
3721 Output = Result<(ScpIdentity, DidDocument), IdentityError>,
3722 > + Send {
3723 self.inner.create(key_custody)
3724 }
3725
3726 fn verify(&self, did_string: &str, public_key: &[u8]) -> bool {
3727 self.inner.verify(did_string, public_key)
3728 }
3729
3730 fn publish(
3731 &self,
3732 identity: &ScpIdentity,
3733 document: &DidDocument,
3734 ) -> impl std::future::Future<Output = Result<(), IdentityError>> + Send {
3735 self.publish_count.fetch_add(1, Ordering::SeqCst);
3736 self.inner.publish(identity, document)
3737 }
3738
3739 fn resolve(
3740 &self,
3741 did_string: &str,
3742 ) -> impl std::future::Future<Output = Result<DidDocument, IdentityError>> + Send
3743 {
3744 self.inner.resolve(did_string)
3745 }
3746
3747 fn rotate(
3748 &self,
3749 identity: &ScpIdentity,
3750 key_custody: &impl KeyCustody,
3751 ) -> impl std::future::Future<
3752 Output = Result<(ScpIdentity, DidDocument), IdentityError>,
3753 > + Send {
3754 self.inner.rotate(identity, key_custody)
3755 }
3756 }
3757
3758 let custody = Arc::new(InMemoryKeyCustody::new());
3759 let publish_count = Arc::new(AtomicU32::new(0));
3760 let counting_method = Arc::new(CountingDidMethod {
3761 inner: make_test_dht(&custody),
3762 publish_count: Arc::clone(&publish_count),
3763 });
3764
3765 let tier = ReachabilityTier::Stun {
3766 external_addr: SocketAddr::from(([198, 51, 100, 7], 32891)),
3767 };
3768
3769 let _node = ApplicationNodeBuilder::new()
3770 .storage(InMemoryStorage::new())
3771 .no_domain()
3772 .nat_strategy(Arc::new(MockNatStrategy { tier }))
3773 .generate_identity_with(custody, counting_method)
3774 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
3775 .build_for_testing()
3776 .await
3777 .unwrap();
3778
3779 assert_eq!(
3780 publish_count.load(Ordering::SeqCst),
3781 1,
3782 "DID should be published exactly once on no-domain build"
3783 );
3784 }
3785
3786 #[test]
3787 fn default_stun_endpoints_parseable() {
3788 for (addr, _label) in DEFAULT_STUN_ENDPOINTS {
3789 let parsed: std::net::SocketAddr = addr
3790 .parse()
3791 .unwrap_or_else(|e| panic!("STUN endpoint '{addr}' not parseable: {e}"));
3792 assert_ne!(parsed.port(), 0);
3793 }
3794 }
3795
3796 fn build_stun_binding_response(addr: SocketAddr, transaction_id: &[u8; 12]) -> Vec<u8> {
3804 const MAGIC_COOKIE: u32 = 0x2112_A442;
3805 const BINDING_RESPONSE: u16 = 0x0101;
3806 const ATTR_XOR_MAPPED_ADDRESS: u16 = 0x0020;
3807
3808 let mut attr_data = Vec::new();
3810 attr_data.push(0x00); match addr {
3812 SocketAddr::V4(v4) => {
3813 attr_data.push(0x01); let xor_port = v4.port() ^ (MAGIC_COOKIE >> 16) as u16;
3815 attr_data.extend_from_slice(&xor_port.to_be_bytes());
3816 let ip_bits: u32 = (*v4.ip()).into();
3817 let xor_ip = ip_bits ^ MAGIC_COOKIE;
3818 attr_data.extend_from_slice(&xor_ip.to_be_bytes());
3819 }
3820 SocketAddr::V6(v6) => {
3821 attr_data.push(0x02); let xor_port = v6.port() ^ (MAGIC_COOKIE >> 16) as u16;
3823 attr_data.extend_from_slice(&xor_port.to_be_bytes());
3824 let ip_bytes = v6.ip().octets();
3825 let mut xor_key = [0u8; 16];
3826 xor_key[0..4].copy_from_slice(&MAGIC_COOKIE.to_be_bytes());
3827 xor_key[4..16].copy_from_slice(transaction_id);
3828 for i in 0..16 {
3829 attr_data.push(ip_bytes[i] ^ xor_key[i]);
3830 }
3831 }
3832 }
3833
3834 #[allow(clippy::cast_possible_truncation)]
3835 let attr_len = attr_data.len() as u16;
3836 #[allow(clippy::cast_possible_truncation)]
3837 let padded_attr_len = ((attr_data.len() + 3) & !3) as u16;
3838 let msg_len = 4 + padded_attr_len;
3839
3840 let mut buf = Vec::with_capacity(20 + msg_len as usize);
3841
3842 buf.extend_from_slice(&BINDING_RESPONSE.to_be_bytes());
3844 buf.extend_from_slice(&msg_len.to_be_bytes());
3845 buf.extend_from_slice(&MAGIC_COOKIE.to_be_bytes());
3846 buf.extend_from_slice(transaction_id);
3847
3848 buf.extend_from_slice(&ATTR_XOR_MAPPED_ADDRESS.to_be_bytes());
3850 buf.extend_from_slice(&attr_len.to_be_bytes());
3851 buf.extend_from_slice(&attr_data);
3852
3853 let padding = (4 - (attr_data.len() % 4)) % 4;
3855 buf.extend(std::iter::repeat_n(0u8, padding));
3856
3857 buf
3858 }
3859
3860 fn spawn_mock_stun_server(
3863 socket: tokio::net::UdpSocket,
3864 external_addr: SocketAddr,
3865 count: usize,
3866 ) -> tokio::task::JoinHandle<()> {
3867 tokio::spawn(async move {
3868 for _ in 0..count {
3869 let mut buf = [0u8; 576];
3870 let (_, from) = socket.recv_from(&mut buf).await.expect("recv");
3871 let mut txn_id = [0u8; 12];
3872 txn_id.copy_from_slice(&buf[8..20]);
3873 let response = build_stun_binding_response(external_addr, &txn_id);
3874 socket.send_to(&response, from).await.expect("send");
3875 }
3876 })
3877 }
3878
3879 struct MockReachabilityProbe {
3881 reachable: std::sync::atomic::AtomicBool,
3883 }
3884
3885 impl MockReachabilityProbe {
3886 fn new(reachable: bool) -> Self {
3887 Self {
3888 reachable: std::sync::atomic::AtomicBool::new(reachable),
3889 }
3890 }
3891 }
3892
3893 impl scp_transport::nat::ReachabilityProbe for MockReachabilityProbe {
3894 fn probe_reachability<'a>(
3895 &'a self,
3896 _socket: &'a tokio::net::UdpSocket,
3897 _external_addr: SocketAddr,
3898 ) -> std::pin::Pin<
3899 Box<
3900 dyn std::future::Future<Output = Result<bool, scp_transport::TransportError>>
3901 + Send
3902 + 'a,
3903 >,
3904 > {
3905 let reachable = self.reachable.load(std::sync::atomic::Ordering::Relaxed);
3906 Box::pin(async move { Ok(reachable) })
3907 }
3908 }
3909
3910 struct MockPortMapper {
3912 result: tokio::sync::Mutex<
3913 Option<
3914 Result<scp_transport::nat::PortMappingResult, scp_transport::nat::PortMappingError>,
3915 >,
3916 >,
3917 }
3918
3919 impl MockPortMapper {
3920 fn ok(addr: SocketAddr) -> Self {
3921 Self {
3922 result: tokio::sync::Mutex::new(Some(Ok(scp_transport::nat::PortMappingResult {
3923 external_addr: addr,
3924 ttl: std::time::Duration::from_secs(600),
3925 protocol: scp_transport::nat::MappingProtocol::UpnpIgd,
3926 }))),
3927 }
3928 }
3929
3930 fn fail(msg: &str) -> Self {
3931 Self {
3932 result: tokio::sync::Mutex::new(Some(Err(
3933 scp_transport::nat::PortMappingError::DiscoveryFailed(msg.to_owned()),
3934 ))),
3935 }
3936 }
3937 }
3938
3939 impl scp_transport::nat::PortMapper for MockPortMapper {
3940 fn map_port(
3941 &self,
3942 _internal_port: u16,
3943 ) -> std::pin::Pin<
3944 Box<
3945 dyn std::future::Future<
3946 Output = Result<
3947 scp_transport::nat::PortMappingResult,
3948 scp_transport::nat::PortMappingError,
3949 >,
3950 > + Send
3951 + '_,
3952 >,
3953 > {
3954 Box::pin(async {
3955 let mut r = self.result.lock().await;
3956 r.take().unwrap_or_else(|| {
3957 Err(scp_transport::nat::PortMappingError::Internal(
3958 "no more results".to_owned(),
3959 ))
3960 })
3961 })
3962 }
3963
3964 fn renew(
3965 &self,
3966 _internal_port: u16,
3967 ) -> std::pin::Pin<
3968 Box<
3969 dyn std::future::Future<
3970 Output = Result<
3971 scp_transport::nat::PortMappingResult,
3972 scp_transport::nat::PortMappingError,
3973 >,
3974 > + Send
3975 + '_,
3976 >,
3977 > {
3978 Box::pin(async {
3979 Err(scp_transport::nat::PortMappingError::Internal(
3980 "renew not expected".to_owned(),
3981 ))
3982 })
3983 }
3984
3985 fn remove(
3986 &self,
3987 _internal_port: u16,
3988 ) -> std::pin::Pin<
3989 Box<
3990 dyn std::future::Future<Output = Result<(), scp_transport::nat::PortMappingError>>
3991 + Send
3992 + '_,
3993 >,
3994 > {
3995 Box::pin(async { Ok(()) })
3996 }
3997 }
3998
3999 #[tokio::test]
4005 async fn default_nat_strategy_upnp_self_test_failure_falls_through_to_bridge() {
4006 let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
4008 .await
4009 .expect("bind");
4010 let stun_addr = stun.local_addr().expect("addr");
4011 let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
4012
4013 let h = spawn_mock_stun_server(stun, stun_external, 1);
4015
4016 let upnp_external = SocketAddr::from(([198, 51, 100, 1], 8443_u16));
4018 let mapper = Arc::new(MockPortMapper::ok(upnp_external));
4019 let probe = Arc::new(MockReachabilityProbe::new(false));
4020
4021 let strategy = DefaultNatStrategy::new(
4022 Some(stun_addr.to_string()),
4023 Some("wss://bridge.example.com/scp/v1".to_owned()),
4024 )
4025 .with_port_mapper(mapper)
4026 .with_reachability_probe(probe);
4027
4028 let tier = strategy.select_tier(4000).await.expect("should succeed");
4029
4030 assert!(
4033 matches!(tier, ReachabilityTier::Bridge { .. }),
4034 "should fall through to bridge when all self-tests fail, got: {tier:?}"
4035 );
4036
4037 h.await.expect("server");
4038 }
4039
4040 #[tokio::test]
4042 async fn default_nat_strategy_upnp_self_test_success_returns_tier1() {
4043 let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
4044 .await
4045 .expect("bind");
4046 let stun_addr = stun.local_addr().expect("addr");
4047 let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
4048
4049 let h = spawn_mock_stun_server(stun, stun_external, 1);
4051
4052 let upnp_external = SocketAddr::from(([198, 51, 100, 1], 8443_u16));
4054 let mapper = Arc::new(MockPortMapper::ok(upnp_external));
4055 let probe = Arc::new(MockReachabilityProbe::new(true));
4056
4057 let strategy = DefaultNatStrategy::new(Some(stun_addr.to_string()), None)
4058 .with_port_mapper(mapper)
4059 .with_reachability_probe(probe);
4060
4061 let tier = strategy.select_tier(4000).await.expect("should succeed");
4062
4063 match tier {
4064 ReachabilityTier::Upnp { external_addr } => {
4065 assert_eq!(external_addr, upnp_external);
4066 }
4067 other => panic!("expected Tier 1 Upnp, got: {other:?}"),
4068 }
4069
4070 h.await.expect("server");
4071 }
4072
4073 #[tokio::test]
4076 async fn default_nat_strategy_upnp_mapping_failure_falls_through_to_stun() {
4077 let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
4078 .await
4079 .expect("bind");
4080 let stun_addr = stun.local_addr().expect("addr");
4081 let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
4082
4083 let h = spawn_mock_stun_server(stun, stun_external, 1);
4087
4088 let mapper = Arc::new(MockPortMapper::fail("no UPnP gateway"));
4090 let probe = Arc::new(MockReachabilityProbe::new(true));
4091
4092 let strategy = DefaultNatStrategy::new(
4093 Some(stun_addr.to_string()),
4094 Some("wss://bridge.example.com/scp/v1".to_owned()),
4095 )
4096 .with_port_mapper(mapper)
4097 .with_reachability_probe(probe);
4098
4099 let tier = strategy.select_tier(4000).await.expect("should succeed");
4100
4101 match tier {
4102 ReachabilityTier::Stun { external_addr } => {
4103 assert_eq!(external_addr, stun_external);
4104 }
4105 other => panic!("expected Tier 2 Stun, got: {other:?}"),
4106 }
4107
4108 h.await.expect("server");
4109 }
4110
4111 #[tokio::test]
4113 async fn default_nat_strategy_no_port_mapper_skips_tier1() {
4114 let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
4115 .await
4116 .expect("bind");
4117 let stun_addr = stun.local_addr().expect("addr");
4118 let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
4119
4120 let h = spawn_mock_stun_server(stun, stun_external, 1);
4122
4123 let probe = Arc::new(MockReachabilityProbe::new(true));
4124
4125 let strategy = DefaultNatStrategy::new(Some(stun_addr.to_string()), None)
4126 .with_reachability_probe(probe);
4127
4128 let tier = strategy.select_tier(4000).await.expect("should succeed");
4129
4130 match tier {
4131 ReachabilityTier::Stun { external_addr } => {
4132 assert_eq!(external_addr, stun_external);
4133 }
4134 other => panic!("expected Tier 2 Stun, got: {other:?}"),
4135 }
4136
4137 h.await.expect("server");
4138 }
4139
4140 #[tokio::test]
4142 async fn default_nat_strategy_stun_self_test_failure_falls_through_to_bridge() {
4143 let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
4144 .await
4145 .expect("bind");
4146 let stun_addr = stun.local_addr().expect("addr");
4147 let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
4148
4149 let h = spawn_mock_stun_server(stun, stun_external, 1);
4151
4152 let probe = Arc::new(MockReachabilityProbe::new(false));
4154
4155 let strategy = DefaultNatStrategy::new(
4156 Some(stun_addr.to_string()),
4157 Some("wss://bridge.example.com/scp/v1".to_owned()),
4158 )
4159 .with_reachability_probe(probe);
4160
4161 let tier = strategy.select_tier(4000).await.expect("should succeed");
4162
4163 match tier {
4164 ReachabilityTier::Bridge { bridge_url } => {
4165 assert_eq!(bridge_url, "wss://bridge.example.com/scp/v1");
4166 }
4167 other => panic!("expected Tier 3 Bridge, got: {other:?}"),
4168 }
4169
4170 h.await.expect("server");
4171 }
4172
4173 mod http_tests {
4176 use super::*;
4177 use axum::body::Body;
4178 use axum::http::{Request, StatusCode};
4179 use scp_core::well_known::WellKnownScp;
4180 use tower::ServiceExt;
4181
4182 async fn build_test_node() -> ApplicationNode<InMemoryStorage> {
4185 test_builder()
4186 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
4187 .build_for_testing()
4188 .await
4189 .unwrap()
4190 }
4191
4192 #[tokio::test]
4193 async fn well_known_returns_valid_json() {
4194 let node = build_test_node().await;
4195 let router = node.well_known_router();
4196
4197 let request = Request::builder()
4198 .uri("/.well-known/scp")
4199 .body(Body::empty())
4200 .unwrap();
4201
4202 let response = router.oneshot(request).await.unwrap();
4203
4204 assert_eq!(response.status(), StatusCode::OK);
4205
4206 let content_type = response
4208 .headers()
4209 .get("content-type")
4210 .expect("should have content-type header")
4211 .to_str()
4212 .unwrap();
4213 assert!(
4214 content_type.contains("application/json"),
4215 "Content-Type should be application/json, got: {content_type}"
4216 );
4217
4218 let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
4220 .await
4221 .unwrap();
4222 let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
4223
4224 assert_eq!(doc.version, 1);
4225 assert!(
4226 doc.did.starts_with("did:dht:"),
4227 "DID should be the node's DID, got: {}",
4228 doc.did
4229 );
4230 assert_eq!(doc.relay, "wss://test.example.com/scp/v1");
4231 assert!(doc.contexts.is_none(), "no contexts registered yet");
4232 }
4233
4234 #[tokio::test]
4235 async fn well_known_includes_registered_broadcast_contexts() {
4236 let node = build_test_node().await;
4237
4238 node.register_broadcast_context("abc123".to_owned(), Some("Test Broadcast".to_owned()))
4240 .await
4241 .unwrap();
4242
4243 let router = node.well_known_router();
4244
4245 let request = Request::builder()
4246 .uri("/.well-known/scp")
4247 .body(Body::empty())
4248 .unwrap();
4249
4250 let response = router.oneshot(request).await.unwrap();
4251 assert_eq!(response.status(), StatusCode::OK);
4252
4253 let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
4254 .await
4255 .unwrap();
4256 let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
4257
4258 let contexts = doc.contexts.expect("should have contexts");
4259 assert_eq!(contexts.len(), 1);
4260 assert_eq!(contexts[0].id, "abc123");
4261 assert_eq!(contexts[0].name.as_deref(), Some("Test Broadcast"));
4262 assert_eq!(contexts[0].mode.as_deref(), Some("broadcast"));
4263 assert!(
4264 contexts[0]
4265 .uri
4266 .as_ref()
4267 .unwrap()
4268 .starts_with("scp://context/abc123"),
4269 "URI should start with scp://context/abc123, got: {}",
4270 contexts[0].uri.as_ref().unwrap()
4271 );
4272 }
4273
4274 #[tokio::test]
4275 async fn well_known_dynamic_updates_on_new_context() {
4276 let node = build_test_node().await;
4277
4278 let router = node.well_known_router();
4280 let request = Request::builder()
4281 .uri("/.well-known/scp")
4282 .body(Body::empty())
4283 .unwrap();
4284 let response = router.oneshot(request).await.unwrap();
4285 let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
4286 .await
4287 .unwrap();
4288 let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
4289 assert!(doc.contexts.is_none());
4290
4291 node.register_broadcast_context("def456".to_owned(), None)
4293 .await
4294 .unwrap();
4295
4296 let router = node.well_known_router();
4298 let request = Request::builder()
4299 .uri("/.well-known/scp")
4300 .body(Body::empty())
4301 .unwrap();
4302 let response = router.oneshot(request).await.unwrap();
4303 let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
4304 .await
4305 .unwrap();
4306 let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
4307
4308 let contexts = doc.contexts.expect("should now have contexts");
4309 assert_eq!(contexts.len(), 1);
4310 assert_eq!(contexts[0].id, "def456");
4311 }
4312
4313 #[tokio::test]
4314 async fn relay_router_upgrades_websocket() {
4315 let node = build_test_node().await;
4316 let _relay_addr = node.relay().bound_addr();
4317
4318 let relay_router = node.relay_router();
4320 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4321 let http_addr = listener.local_addr().unwrap();
4322
4323 let server_handle = tokio::spawn(async move {
4324 axum::serve(listener, relay_router).await.unwrap();
4325 });
4326
4327 let url = format!("ws://{http_addr}/scp/v1");
4329 let connect_result = tokio_tungstenite::connect_async(&url).await;
4330
4331 assert!(
4332 connect_result.is_ok(),
4333 "WebSocket upgrade at /scp/v1 should succeed, got error: {:?}",
4334 connect_result.err()
4335 );
4336
4337 server_handle.abort();
4339 let _ = server_handle.await;
4340 }
4341
4342 #[tokio::test]
4343 async fn custom_app_routes_merge_with_scp_routes() {
4344 let node = build_test_node().await;
4345
4346 let app_router =
4348 axum::Router::new().route("/health", axum::routing::get(|| async { "ok" }));
4349
4350 let well_known = node.well_known_router();
4352 let merged = app_router.merge(well_known);
4353
4354 let request = Request::builder()
4356 .uri("/health")
4357 .body(Body::empty())
4358 .unwrap();
4359 let response = merged.clone().oneshot(request).await.unwrap();
4360 assert_eq!(response.status(), StatusCode::OK);
4361
4362 let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
4363 .await
4364 .unwrap();
4365 assert_eq!(&body[..], b"ok");
4366
4367 let request = Request::builder()
4369 .uri("/.well-known/scp")
4370 .body(Body::empty())
4371 .unwrap();
4372 let response = merged.oneshot(request).await.unwrap();
4373 assert_eq!(response.status(), StatusCode::OK);
4374
4375 let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
4376 .await
4377 .unwrap();
4378 let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
4379 assert_eq!(doc.version, 1);
4380 }
4381 }
4382
4383 struct SequenceNatStrategy {
4388 tiers: std::sync::Mutex<Vec<ReachabilityTier>>,
4389 call_count: std::sync::atomic::AtomicU32,
4390 }
4391
4392 impl SequenceNatStrategy {
4393 fn new(tiers: Vec<ReachabilityTier>) -> Self {
4394 Self {
4395 tiers: std::sync::Mutex::new(tiers),
4396 call_count: std::sync::atomic::AtomicU32::new(0),
4397 }
4398 }
4399 }
4400
4401 impl NatStrategy for SequenceNatStrategy {
4402 fn select_tier(
4403 &self,
4404 _relay_port: u16,
4405 ) -> std::pin::Pin<
4406 Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
4407 > {
4408 let idx = self
4409 .call_count
4410 .fetch_add(1, std::sync::atomic::Ordering::SeqCst) as usize;
4411 let tiers = self.tiers.lock().unwrap();
4412 let tier = tiers[idx % tiers.len()].clone();
4414 drop(tiers);
4415 Box::pin(async move { Ok(tier) })
4416 }
4417 }
4418
4419 struct RecordingPublisher {
4421 publish_count: std::sync::atomic::AtomicU32,
4422 }
4423
4424 impl RecordingPublisher {
4425 fn new() -> Self {
4426 Self {
4427 publish_count: std::sync::atomic::AtomicU32::new(0),
4428 }
4429 }
4430
4431 fn count(&self) -> u32 {
4432 self.publish_count.load(std::sync::atomic::Ordering::SeqCst)
4433 }
4434 }
4435
4436 impl DidPublisher for RecordingPublisher {
4437 fn publish<'a>(
4438 &'a self,
4439 _identity: &'a ScpIdentity,
4440 _document: &'a DidDocument,
4441 ) -> std::pin::Pin<
4442 Box<dyn std::future::Future<Output = Result<(), IdentityError>> + Send + 'a>,
4443 > {
4444 self.publish_count
4445 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
4446 Box::pin(async { Ok(()) })
4447 }
4448 }
4449
4450 const TEST_REEVALUATION_INTERVAL: Duration = Duration::from_millis(50);
4452
4453 const TEST_EVENT_TIMEOUT: Duration = Duration::from_secs(5);
4455
4456 #[tokio::test]
4457 async fn tier_change_after_30_minutes_triggers_did_republish() {
4458 let initial_addr = SocketAddr::from(([198, 51, 100, 7], 32891));
4462 let new_addr = SocketAddr::from(([203, 0, 113, 42], 8443));
4463
4464 let strategy = Arc::new(SequenceNatStrategy::new(vec![
4466 ReachabilityTier::Stun {
4467 external_addr: initial_addr,
4468 },
4469 ReachabilityTier::Upnp {
4470 external_addr: new_addr,
4471 },
4472 ]));
4473
4474 let publisher = Arc::new(RecordingPublisher::new());
4475 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(16);
4476
4477 let identity = ScpIdentity {
4478 identity_key: scp_platform::KeyHandle::new(1),
4479 active_signing_key: scp_platform::KeyHandle::new(2),
4480 agent_signing_key: None,
4481 pre_rotation_commitment: [0u8; 32],
4482 did: "did:dht:test123".to_owned(),
4483 };
4484
4485 let document = DidDocument {
4486 context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
4487 id: "did:dht:test123".to_owned(),
4488 verification_method: vec![],
4489 authentication: vec![],
4490 assertion_method: vec![],
4491 also_known_as: vec![],
4492 service: vec![scp_identity::document::Service {
4493 id: "did:dht:test123#scp-relay-1".to_owned(),
4494 service_type: "SCPRelay".to_owned(),
4495 service_endpoint: "ws://198.51.100.7:32891/scp/v1".to_owned(),
4496 }],
4497 };
4498
4499 let handle = spawn_tier_reevaluation(
4500 Arc::clone(&strategy) as Arc<dyn NatStrategy>,
4501 None,
4502 Arc::clone(&publisher) as Arc<dyn DidPublisher>,
4503 identity,
4504 document,
4505 32891,
4506 "ws://198.51.100.7:32891/scp/v1".to_owned(),
4507 Some(event_tx),
4508 TEST_REEVALUATION_INTERVAL,
4509 );
4510
4511 let event = tokio::time::timeout(TEST_EVENT_TIMEOUT, event_rx.recv())
4513 .await
4514 .expect("timeout waiting for tier change event")
4515 .expect("channel closed unexpectedly");
4516
4517 match event {
4518 NatTierChange::TierChanged {
4519 previous_relay_url,
4520 new_relay_url,
4521 reason,
4522 } => {
4523 assert_eq!(previous_relay_url, "ws://198.51.100.7:32891/scp/v1");
4524 assert_eq!(new_relay_url, "ws://203.0.113.42:8443/scp/v1");
4525 assert!(
4526 reason.contains("periodic"),
4527 "reason should mention periodic: {reason}"
4528 );
4529 }
4530 other => panic!("expected TierChanged, got {other:?}"),
4531 }
4532
4533 assert_eq!(
4535 publisher.count(),
4536 1,
4537 "DID document should be republished after tier change"
4538 );
4539
4540 handle.stop();
4541 }
4542
4543 #[tokio::test]
4544 async fn network_event_triggers_immediate_reevaluation() {
4545 let new_addr = SocketAddr::from(([10, 0, 0, 1], 9999));
4548
4549 let strategy = Arc::new(SequenceNatStrategy::new(vec![ReachabilityTier::Stun {
4553 external_addr: new_addr,
4554 }]));
4555
4556 let publisher = Arc::new(RecordingPublisher::new());
4557 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(16);
4558
4559 let (net_change_tx, net_change_rx) = tokio::sync::mpsc::channel(16);
4561 let detector = Arc::new(scp_transport::nat::ChannelNetworkChangeDetector::new(
4562 net_change_rx,
4563 ));
4564
4565 let identity = ScpIdentity {
4566 identity_key: scp_platform::KeyHandle::new(1),
4567 active_signing_key: scp_platform::KeyHandle::new(2),
4568 agent_signing_key: None,
4569 pre_rotation_commitment: [0u8; 32],
4570 did: "did:dht:testnet123".to_owned(),
4571 };
4572
4573 let document = DidDocument {
4574 context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
4575 id: "did:dht:testnet123".to_owned(),
4576 verification_method: vec![],
4577 authentication: vec![],
4578 assertion_method: vec![],
4579 also_known_as: vec![],
4580 service: vec![scp_identity::document::Service {
4581 id: "did:dht:testnet123#scp-relay-1".to_owned(),
4582 service_type: "SCPRelay".to_owned(),
4583 service_endpoint: "ws://198.51.100.7:32891/scp/v1".to_owned(),
4584 }],
4585 };
4586
4587 let handle = spawn_tier_reevaluation(
4588 Arc::clone(&strategy) as Arc<dyn NatStrategy>,
4589 Some(detector as Arc<dyn NetworkChangeDetector>),
4590 Arc::clone(&publisher) as Arc<dyn DidPublisher>,
4591 identity,
4592 document,
4593 32891,
4594 "ws://198.51.100.7:32891/scp/v1".to_owned(),
4595 Some(event_tx),
4596 Duration::from_secs(60 * 60),
4598 );
4599
4600 tokio::task::yield_now().await;
4603 tokio::time::sleep(Duration::from_millis(10)).await;
4604
4605 net_change_tx.send(()).await.expect("send network change");
4607
4608 let event = tokio::time::timeout(TEST_EVENT_TIMEOUT, event_rx.recv())
4610 .await
4611 .expect("timeout waiting for tier change event")
4612 .expect("channel closed unexpectedly");
4613
4614 match event {
4615 NatTierChange::TierChanged {
4616 previous_relay_url,
4617 new_relay_url,
4618 reason,
4619 } => {
4620 assert_eq!(previous_relay_url, "ws://198.51.100.7:32891/scp/v1");
4621 assert_eq!(new_relay_url, "ws://10.0.0.1:9999/scp/v1");
4622 assert!(
4623 reason.contains("network change"),
4624 "reason should mention network change: {reason}"
4625 );
4626 }
4627 other => panic!("expected TierChanged, got {other:?}"),
4628 }
4629
4630 assert_eq!(
4632 publisher.count(),
4633 1,
4634 "DID document should be republished after network change"
4635 );
4636
4637 handle.stop();
4638 }
4639
4640 #[tokio::test]
4641 async fn no_event_when_tier_unchanged_after_reevaluation() {
4642 let addr = SocketAddr::from(([198, 51, 100, 7], 32891));
4644
4645 let strategy = Arc::new(SequenceNatStrategy::new(vec![ReachabilityTier::Stun {
4647 external_addr: addr,
4648 }]));
4649
4650 let publisher = Arc::new(RecordingPublisher::new());
4651 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(16);
4652
4653 let identity = ScpIdentity {
4654 identity_key: scp_platform::KeyHandle::new(1),
4655 active_signing_key: scp_platform::KeyHandle::new(2),
4656 agent_signing_key: None,
4657 pre_rotation_commitment: [0u8; 32],
4658 did: "did:dht:unchanged123".to_owned(),
4659 };
4660
4661 let document = DidDocument {
4662 context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
4663 id: "did:dht:unchanged123".to_owned(),
4664 verification_method: vec![],
4665 authentication: vec![],
4666 assertion_method: vec![],
4667 also_known_as: vec![],
4668 service: vec![scp_identity::document::Service {
4669 id: "did:dht:unchanged123#scp-relay-1".to_owned(),
4670 service_type: "SCPRelay".to_owned(),
4671 service_endpoint: "ws://198.51.100.7:32891/scp/v1".to_owned(),
4672 }],
4673 };
4674
4675 let handle = spawn_tier_reevaluation(
4676 Arc::clone(&strategy) as Arc<dyn NatStrategy>,
4677 None,
4678 Arc::clone(&publisher) as Arc<dyn DidPublisher>,
4679 identity,
4680 document,
4681 32891,
4682 "ws://198.51.100.7:32891/scp/v1".to_owned(),
4683 Some(event_tx),
4684 TEST_REEVALUATION_INTERVAL,
4685 );
4686
4687 tokio::time::sleep(Duration::from_millis(200)).await;
4690
4691 assert_eq!(
4693 publisher.count(),
4694 0,
4695 "DID document should NOT be republished when tier is unchanged"
4696 );
4697
4698 let recv_result = event_rx.try_recv();
4700 assert!(
4701 recv_result.is_err(),
4702 "no TierChanged event should be emitted when tier is unchanged"
4703 );
4704
4705 handle.stop();
4706 }
4707
4708 struct FailThenSucceedStrategy {
4710 call_count: std::sync::atomic::AtomicU32,
4711 success_tier: ReachabilityTier,
4712 }
4713
4714 impl NatStrategy for FailThenSucceedStrategy {
4715 fn select_tier(
4716 &self,
4717 _relay_port: u16,
4718 ) -> std::pin::Pin<
4719 Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
4720 > {
4721 let n = self
4722 .call_count
4723 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
4724 let tier = self.success_tier.clone();
4725 Box::pin(async move {
4726 if n == 0 {
4727 Err(NodeError::Nat("transient STUN failure".into()))
4728 } else {
4729 Ok(tier)
4730 }
4731 })
4732 }
4733 }
4734
4735 #[tokio::test]
4736 async fn reevaluation_loop_survives_nat_probe_failure() {
4737 let addr = SocketAddr::from(([198, 51, 100, 7], 32891));
4739 let new_addr = SocketAddr::from(([10, 0, 0, 1], 5000));
4740
4741 let strategy = Arc::new(FailThenSucceedStrategy {
4742 call_count: std::sync::atomic::AtomicU32::new(0),
4743 success_tier: ReachabilityTier::Stun {
4744 external_addr: new_addr,
4745 },
4746 });
4747
4748 let publisher = Arc::new(RecordingPublisher::new());
4749 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(16);
4750
4751 let identity = ScpIdentity {
4752 identity_key: scp_platform::KeyHandle::new(1),
4753 active_signing_key: scp_platform::KeyHandle::new(2),
4754 agent_signing_key: None,
4755 pre_rotation_commitment: [0u8; 32],
4756 did: "did:dht:resilient123".to_owned(),
4757 };
4758
4759 let document = DidDocument {
4760 context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
4761 id: "did:dht:resilient123".to_owned(),
4762 verification_method: vec![],
4763 authentication: vec![],
4764 assertion_method: vec![],
4765 also_known_as: vec![],
4766 service: vec![scp_identity::document::Service {
4767 id: "did:dht:resilient123#scp-relay-1".to_owned(),
4768 service_type: "SCPRelay".to_owned(),
4769 service_endpoint: format!("ws://{addr}/scp/v1"),
4770 }],
4771 };
4772
4773 let handle = spawn_tier_reevaluation(
4774 strategy as Arc<dyn NatStrategy>,
4775 None,
4776 Arc::clone(&publisher) as Arc<dyn DidPublisher>,
4777 identity,
4778 document,
4779 addr.port(),
4780 format!("ws://{addr}/scp/v1"),
4781 Some(event_tx),
4782 TEST_REEVALUATION_INTERVAL,
4783 );
4784
4785 let event = tokio::time::timeout(TEST_EVENT_TIMEOUT, event_rx.recv())
4789 .await
4790 .expect("timeout waiting for tier change event after recovery")
4791 .expect("channel closed unexpectedly");
4792 assert!(matches!(event, NatTierChange::TierChanged { .. }));
4793
4794 assert_eq!(
4797 publisher.count(),
4798 1,
4799 "republish after successful re-evaluation"
4800 );
4801
4802 handle.stop();
4803 }
4804
4805 #[tokio::test]
4806 async fn no_domain_build_spawns_tier_reevaluation_task() {
4807 let tier = ReachabilityTier::Stun {
4809 external_addr: SocketAddr::from(([198, 51, 100, 7], 32891)),
4810 };
4811
4812 let node = test_no_domain_builder(tier)
4813 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
4814 .build_for_testing()
4815 .await
4816 .unwrap();
4817
4818 assert!(
4819 node.tier_reeval.is_some(),
4820 "no-domain mode should spawn the tier re-evaluation task"
4821 );
4822 assert!(
4823 node.tier_change_rx.is_some(),
4824 "no-domain mode should provide a tier change event channel"
4825 );
4826
4827 node.shutdown();
4828 }
4829
4830 #[tokio::test]
4831 async fn domain_build_does_not_spawn_tier_reevaluation_task() {
4832 let node = test_builder()
4835 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
4836 .build_for_testing()
4837 .await
4838 .unwrap();
4839
4840 assert!(
4841 node.tier_reeval.is_none(),
4842 "domain mode should NOT spawn the tier re-evaluation task"
4843 );
4844 assert!(
4845 node.tier_change_rx.is_none(),
4846 "domain mode should NOT provide a tier change event channel"
4847 );
4848
4849 node.shutdown();
4850 }
4851
4852 #[tokio::test]
4857 async fn identity_with_storage_creates_and_persists_on_first_run() {
4858 let storage = Arc::new(InMemoryStorage::new());
4860 let custody = Arc::new(InMemoryKeyCustody::new());
4861 let did_method = Arc::new(make_test_dht(&custody));
4862
4863 let node = ApplicationNodeBuilder::new()
4864 .storage(Arc::clone(&storage))
4865 .domain("persist.example.com")
4866 .tls_provider(Arc::new(SucceedingTlsProvider {
4867 domain: "persist.example.com".to_owned(),
4868 }))
4869 .identity_with_storage(custody, did_method)
4870 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
4871 .build_for_testing()
4872 .await
4873 .unwrap();
4874
4875 let did = node.identity().did().to_owned();
4876 assert!(
4877 did.starts_with("did:dht:"),
4878 "DID should start with did:dht:"
4879 );
4880
4881 let stored = storage
4883 .retrieve(IDENTITY_STORAGE_KEY)
4884 .await
4885 .unwrap()
4886 .expect("identity should be persisted to storage");
4887 let envelope: StoredValue<PersistedIdentity> = rmp_serde::from_slice(&stored).unwrap();
4888 assert_eq!(envelope.version, CURRENT_STORE_VERSION);
4889 assert_eq!(envelope.data.identity.did, did);
4890 assert_eq!(envelope.data.document.id, did);
4891
4892 node.shutdown();
4893 }
4894
4895 #[tokio::test]
4896 async fn identity_with_storage_reloads_on_subsequent_run() {
4897 let storage = Arc::new(InMemoryStorage::new());
4899 let custody = Arc::new(InMemoryKeyCustody::new());
4900 let did_method = Arc::new(make_test_dht(&custody));
4901
4902 let node1 = ApplicationNodeBuilder::new()
4903 .storage(Arc::clone(&storage))
4904 .domain("reload.example.com")
4905 .tls_provider(Arc::new(SucceedingTlsProvider {
4906 domain: "reload.example.com".to_owned(),
4907 }))
4908 .identity_with_storage(Arc::clone(&custody), Arc::clone(&did_method))
4909 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
4910 .build_for_testing()
4911 .await
4912 .unwrap();
4913
4914 let first_did = node1.identity().did().to_owned();
4915 node1.shutdown();
4916
4917 let node2 = ApplicationNodeBuilder::new()
4919 .storage(Arc::clone(&storage))
4920 .domain("reload.example.com")
4921 .tls_provider(Arc::new(SucceedingTlsProvider {
4922 domain: "reload.example.com".to_owned(),
4923 }))
4924 .identity_with_storage(custody, did_method)
4925 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
4926 .build_for_testing()
4927 .await
4928 .unwrap();
4929
4930 assert_eq!(
4931 node2.identity().did(),
4932 first_did,
4933 "second run should produce the same DID"
4934 );
4935
4936 node2.shutdown();
4937 }
4938
4939 #[tokio::test]
4940 async fn identity_with_storage_rejects_mismatched_custody() {
4941 let storage = Arc::new(InMemoryStorage::new());
4943 let custody1 = Arc::new(InMemoryKeyCustody::new());
4944 let did_method = Arc::new(make_test_dht(&custody1));
4945
4946 let node1 = ApplicationNodeBuilder::new()
4947 .storage(Arc::clone(&storage))
4948 .domain("mismatch.example.com")
4949 .tls_provider(Arc::new(SucceedingTlsProvider {
4950 domain: "mismatch.example.com".to_owned(),
4951 }))
4952 .identity_with_storage(custody1, Arc::clone(&did_method))
4953 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
4954 .build_for_testing()
4955 .await
4956 .unwrap();
4957
4958 node1.shutdown();
4959
4960 let custody2 = Arc::new(InMemoryKeyCustody::new());
4962 let did_method2 = Arc::new(make_test_dht(&custody2));
4963
4964 let result = ApplicationNodeBuilder::new()
4965 .storage(Arc::clone(&storage))
4966 .domain("mismatch.example.com")
4967 .tls_provider(Arc::new(SucceedingTlsProvider {
4968 domain: "mismatch.example.com".to_owned(),
4969 }))
4970 .identity_with_storage(custody2, did_method2)
4971 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
4972 .build_for_testing()
4973 .await;
4974
4975 let err = result
4976 .err()
4977 .expect("build should fail with mismatched custody");
4978 let msg = err.to_string();
4979 assert!(
4980 msg.contains("not found in custody"),
4981 "expected custody validation error, got: {msg}"
4982 );
4983 }
4984
4985 #[tokio::test]
4986 async fn identity_with_storage_stored_value_envelope_roundtrip() {
4987 use scp_platform::traits::KeyHandle;
4990 let persisted = PersistedIdentity {
4991 identity: ScpIdentity {
4992 identity_key: KeyHandle::new(1),
4993 active_signing_key: KeyHandle::new(2),
4994 agent_signing_key: None,
4995 pre_rotation_commitment: [0u8; 32],
4996 did: "did:dht:zroundtrip".to_owned(),
4997 },
4998 document: DidDocument {
4999 context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
5000 id: "did:dht:zroundtrip".to_owned(),
5001 verification_method: vec![],
5002 authentication: vec![],
5003 assertion_method: vec![],
5004 also_known_as: vec![],
5005 service: vec![],
5006 },
5007 };
5008 let envelope = StoredValue {
5009 version: CURRENT_STORE_VERSION,
5010 data: &persisted,
5011 };
5012 let bytes = rmp_serde::to_vec_named(&envelope).unwrap();
5013 let decoded: StoredValue<PersistedIdentity> = rmp_serde::from_slice(&bytes).unwrap();
5014 assert_eq!(decoded.version, CURRENT_STORE_VERSION);
5015 assert_eq!(decoded.data.identity.did, "did:dht:zroundtrip");
5016 assert_eq!(decoded.data.document.id, "did:dht:zroundtrip");
5017 }
5018
5019 #[tokio::test]
5020 async fn identity_with_storage_rejects_future_version() {
5021 use scp_platform::traits::KeyHandle;
5025 let persisted = PersistedIdentity {
5026 identity: ScpIdentity {
5027 identity_key: KeyHandle::new(1),
5028 active_signing_key: KeyHandle::new(2),
5029 agent_signing_key: None,
5030 pre_rotation_commitment: [0u8; 32],
5031 did: "did:dht:zfuture".to_owned(),
5032 },
5033 document: DidDocument {
5034 context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
5035 id: "did:dht:zfuture".to_owned(),
5036 verification_method: vec![],
5037 authentication: vec![],
5038 assertion_method: vec![],
5039 also_known_as: vec![],
5040 service: vec![],
5041 },
5042 };
5043 let future_version = CURRENT_STORE_VERSION + 1;
5044 let envelope = StoredValue {
5045 version: future_version,
5046 data: &persisted,
5047 };
5048 let bytes = rmp_serde::to_vec_named(&envelope).unwrap();
5049
5050 let storage = Arc::new(InMemoryStorage::new());
5051 storage.store(IDENTITY_STORAGE_KEY, &bytes).await.unwrap();
5052
5053 let custody = Arc::new(InMemoryKeyCustody::new());
5054 let did_method = Arc::new(make_test_dht(&custody));
5055
5056 let result = ApplicationNodeBuilder::new()
5057 .storage(Arc::clone(&storage))
5058 .domain("future-ver.example.com")
5059 .tls_provider(Arc::new(SucceedingTlsProvider {
5060 domain: "future-ver.example.com".to_owned(),
5061 }))
5062 .identity_with_storage(custody, did_method)
5063 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
5064 .build_for_testing()
5065 .await;
5066
5067 match result {
5068 Err(err) => {
5069 let msg = err.to_string();
5070 assert!(
5071 msg.contains("newer than supported version"),
5072 "expected future version rejection error, got: {msg}"
5073 );
5074 }
5075 Ok(node) => {
5076 node.shutdown();
5077 panic!("expected future version rejection, but build succeeded");
5078 }
5079 }
5080 }
5081
5082 #[tokio::test]
5083 async fn generate_identity_with_does_not_persist() {
5084 let storage = Arc::new(InMemoryStorage::new());
5086 let custody = Arc::new(InMemoryKeyCustody::new());
5087 let did_method = Arc::new(make_test_dht(&custody));
5088
5089 let node = ApplicationNodeBuilder::new()
5090 .storage(Arc::clone(&storage))
5091 .domain("nopersist.example.com")
5092 .tls_provider(Arc::new(SucceedingTlsProvider {
5093 domain: "nopersist.example.com".to_owned(),
5094 }))
5095 .generate_identity_with(custody, did_method)
5096 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
5097 .build_for_testing()
5098 .await
5099 .unwrap();
5100
5101 assert!(node.identity().did().starts_with("did:dht:"));
5102
5103 let stored = storage.retrieve(IDENTITY_STORAGE_KEY).await.unwrap();
5105 assert!(
5106 stored.is_none(),
5107 "generate_identity_with should NOT persist identity"
5108 );
5109
5110 node.shutdown();
5111 }
5112
5113 #[tokio::test]
5114 async fn identity_with_storage_no_domain_mode() {
5115 let storage = Arc::new(InMemoryStorage::new());
5117 let custody = Arc::new(InMemoryKeyCustody::new());
5118 let did_method = Arc::new(make_test_dht(&custody));
5119
5120 let tier = ReachabilityTier::Upnp {
5121 external_addr: SocketAddr::from(([1, 2, 3, 4], 9090)),
5122 };
5123 let node = ApplicationNodeBuilder::new()
5124 .storage(Arc::clone(&storage))
5125 .no_domain()
5126 .nat_strategy(Arc::new(MockNatStrategy { tier: tier.clone() }))
5127 .identity_with_storage(Arc::clone(&custody), Arc::clone(&did_method))
5128 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
5129 .build_for_testing()
5130 .await
5131 .unwrap();
5132
5133 let first_did = node.identity().did().to_owned();
5134 node.shutdown();
5135
5136 let node2 = ApplicationNodeBuilder::new()
5138 .storage(Arc::clone(&storage))
5139 .no_domain()
5140 .nat_strategy(Arc::new(MockNatStrategy { tier }))
5141 .identity_with_storage(custody, did_method)
5142 .bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
5143 .build_for_testing()
5144 .await
5145 .unwrap();
5146
5147 assert_eq!(
5148 node2.identity().did(),
5149 first_did,
5150 "no-domain mode should also reload persisted identity"
5151 );
5152
5153 node2.shutdown();
5154 }
5155}