#![forbid(unsafe_code)]
pub mod bridge_auth;
pub mod bridge_handlers;
pub mod dev_api;
pub(crate) mod error;
pub mod http;
pub mod projection;
pub mod tls;
mod well_known;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use scp_core::store::{CURRENT_STORE_VERSION, ProtocolStore, StoredValue};
use scp_identity::document::DidDocument;
use scp_identity::{DidMethod, IdentityError, ScpIdentity};
use scp_platform::EncryptedStorage;
use scp_platform::traits::{KeyCustody, Storage};
use scp_transport::nat::{NatTierChange, NetworkChangeDetector};
use scp_transport::native::server::{RelayConfig, RelayError, RelayServer, ShutdownHandle};
use scp_transport::native::storage::BlobStorageBackend;
use tokio_util::sync::CancellationToken;
use zeroize::Zeroizing;
pub use http::BroadcastContext;
pub use projection::ProjectedContext;
pub const DEFAULT_HTTP_BIND_ADDR: SocketAddr =
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 8443);
pub(crate) const MAX_BROADCAST_CONTEXTS: usize = 1024;
pub const DEFAULT_PROJECTION_RATE_LIMIT: u32 = 60;
#[derive(Debug, thiserror::Error)]
pub enum NodeError {
#[error("missing required field: {0}")]
MissingField(&'static str),
#[error("identity error: {0}")]
Identity(#[from] IdentityError),
#[error("relay error: {0}")]
Relay(#[from] RelayError),
#[error("storage error: {0}")]
Storage(String),
#[error("invalid configuration: {0}")]
InvalidConfig(String),
#[error("serve error: {0}")]
Serve(String),
#[error("NAT traversal error: {0}")]
Nat(String),
#[error("TLS error: {0}")]
Tls(#[from] tls::TlsError),
}
#[derive(Debug)]
pub struct RelayHandle {
bound_addr: SocketAddr,
shutdown_handle: ShutdownHandle,
}
impl RelayHandle {
#[must_use]
pub const fn bound_addr(&self) -> SocketAddr {
self.bound_addr
}
#[must_use]
pub const fn shutdown_handle(&self) -> &ShutdownHandle {
&self.shutdown_handle
}
}
#[derive(Debug)]
pub struct IdentityHandle {
identity: ScpIdentity,
document: DidDocument,
}
impl IdentityHandle {
#[must_use]
pub const fn identity(&self) -> &ScpIdentity {
&self.identity
}
#[must_use]
pub fn did(&self) -> &str {
&self.identity.did
}
#[must_use]
pub const fn document(&self) -> &DidDocument {
&self.document
}
}
pub struct ApplicationNode<S: Storage> {
domain: Option<String>,
relay: RelayHandle,
identity: IdentityHandle,
storage: Arc<ProtocolStore<S>>,
state: Arc<http::NodeState>,
tier_reeval: Option<TierReEvalHandle>,
tier_change_rx: Option<tokio::sync::mpsc::Receiver<NatTierChange>>,
#[cfg(feature = "http3")]
http3_config: Option<scp_transport::http3::Http3Config>,
}
impl<S: Storage + std::fmt::Debug> std::fmt::Debug for ApplicationNode<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApplicationNode")
.field("domain", &self.domain)
.field("relay", &self.relay)
.field("identity", &self.identity)
.field("storage", &"<Storage>")
.field(
"tier_reeval",
&self.tier_reeval.as_ref().map(|_| "<active>"),
)
.finish_non_exhaustive()
}
}
impl<S: Storage> ApplicationNode<S> {
#[must_use]
pub fn domain(&self) -> Option<&str> {
self.domain.as_deref()
}
#[must_use]
pub const fn relay(&self) -> &RelayHandle {
&self.relay
}
#[must_use]
pub const fn identity(&self) -> &IdentityHandle {
&self.identity
}
#[must_use]
pub fn storage(&self) -> &ProtocolStore<S> {
&self.storage
}
#[must_use]
pub fn relay_url(&self) -> &str {
&self.state.relay_url
}
#[must_use]
pub fn cert_resolver(&self) -> Option<&Arc<tls::CertResolver>> {
self.state.cert_resolver.as_ref()
}
pub async fn register_broadcast_context(
&self,
id: String,
name: Option<String>,
) -> Result<(), NodeError> {
if id.is_empty() || id.len() > 64 {
return Err(NodeError::InvalidConfig(
"context id must be 1-64 hex characters".into(),
));
}
if !id.bytes().all(|b| b.is_ascii_hexdigit()) {
return Err(NodeError::InvalidConfig(
"context id must contain only hex characters".into(),
));
}
let id = id.to_ascii_lowercase();
let mut contexts = self.state.broadcast_contexts.write().await;
if !contexts.contains_key(&id) && contexts.len() >= MAX_BROADCAST_CONTEXTS {
return Err(NodeError::InvalidConfig(format!(
"broadcast context limit ({MAX_BROADCAST_CONTEXTS}) reached",
)));
}
contexts.insert(id.clone(), BroadcastContext { id, name });
drop(contexts);
Ok(())
}
#[must_use]
pub fn bridge_token_hex(&self) -> String {
scp_transport::native::server::hex_encode_32(&self.state.bridge_secret)
}
#[must_use]
pub fn dev_token(&self) -> Option<&str> {
self.state.dev_token.as_deref()
}
pub fn shutdown(&self) {
self.relay.shutdown_handle.shutdown();
self.state.shutdown_token.cancel();
if let Some(ref handle) = self.tier_reeval {
handle.stop();
}
}
pub const fn tier_change_rx(
&mut self,
) -> Option<&mut tokio::sync::mpsc::Receiver<NatTierChange>> {
self.tier_change_rx.as_mut()
}
const MAX_PROJECTED_CONTEXTS: usize = 1024;
pub async fn enable_broadcast_projection(
&self,
context_id: &str,
broadcast_key: scp_core::crypto::sender_keys::BroadcastKey,
admission: scp_core::context::broadcast::BroadcastAdmission,
projection_policy: Option<scp_core::context::params::ProjectionPolicy>,
) -> Result<(), NodeError> {
projection::validate_projection_policy(admission, projection_policy.as_ref())
.map_err(NodeError::InvalidConfig)?;
let routing_id = projection::compute_routing_id(context_id);
let mut registry = self.state.projected_contexts.write().await;
if let Some(existing) = registry.get_mut(&routing_id) {
existing.insert_key(broadcast_key);
existing.admission = admission;
existing.projection_policy = projection_policy;
} else {
if registry.len() >= Self::MAX_PROJECTED_CONTEXTS {
return Err(NodeError::InvalidConfig(format!(
"projected context limit ({}) reached",
Self::MAX_PROJECTED_CONTEXTS
)));
}
let projected =
ProjectedContext::new(context_id, broadcast_key, admission, projection_policy);
registry.insert(routing_id, projected);
}
drop(registry);
Ok(())
}
pub async fn disable_broadcast_projection(&self, context_id: &str) {
let routing_id = projection::compute_routing_id(context_id);
let mut registry = self.state.projected_contexts.write().await;
registry.remove(&routing_id);
}
pub async fn propagate_ban_keys(
&self,
context_id: &str,
ban_result: &scp_core::context::broadcast::GovernanceBanResult,
) {
use scp_core::context::governance::RevocationScope;
let routing_id = projection::compute_routing_id(context_id);
let mut registry = self.state.projected_contexts.write().await;
if let Some(projected) = registry.get_mut(&routing_id) {
for rotation in &ban_result.rotated_authors {
projected.insert_key(rotation.new_key.clone());
}
if ban_result.scope == RevocationScope::Full {
let new_epochs: std::collections::HashSet<u64> = ban_result
.rotated_authors
.iter()
.map(|r| r.new_epoch)
.collect();
projected.retain_only_epochs(&new_epochs);
}
}
}
}
#[must_use]
pub fn builder() -> ApplicationNodeBuilder {
ApplicationNodeBuilder::new()
}
const IDENTITY_STORAGE_KEY: &str = "scp/identity";
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct PersistedIdentity {
identity: ScpIdentity,
document: DidDocument,
}
enum IdentitySource<K: KeyCustody, D: DidMethod> {
Generate {
key_custody: Arc<K>,
did_method: Arc<D>,
},
Explicit(Box<ExplicitIdentity<D>>),
}
struct ExplicitIdentity<D: DidMethod> {
identity: ScpIdentity,
document: DidDocument,
did_method: Arc<D>,
}
pub struct NoDomain;
pub struct HasDomain;
pub struct HasNoDomain;
pub struct NoIdentity;
pub struct HasIdentity;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReachabilityTier {
Upnp {
external_addr: SocketAddr,
},
Stun {
external_addr: SocketAddr,
},
Bridge {
bridge_url: String,
},
}
pub trait NatStrategy: Send + Sync {
fn select_tier(
&self,
relay_port: u16,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
>;
}
const DEFAULT_STUN_ENDPOINTS: &[(&str, &str)] = &[
("74.125.250.129:19302", "stun1.l.google.com"),
("64.233.163.127:19302", "stun2.l.google.com"),
];
pub struct DefaultNatStrategy {
stun_server: Option<String>,
bridge_relay: Option<String>,
port_mapper: Option<Arc<dyn scp_transport::nat::PortMapper>>,
reachability_probe: Option<Arc<dyn scp_transport::nat::ReachabilityProbe>>,
}
impl DefaultNatStrategy {
#[must_use]
pub fn new(stun_server: Option<String>, bridge_relay: Option<String>) -> Self {
Self {
stun_server,
bridge_relay,
port_mapper: None,
reachability_probe: None,
}
}
#[must_use]
pub fn with_port_mapper(mut self, mapper: Arc<dyn scp_transport::nat::PortMapper>) -> Self {
self.port_mapper = Some(mapper);
self
}
#[must_use]
pub fn with_reachability_probe(
mut self,
probe: Arc<dyn scp_transport::nat::ReachabilityProbe>,
) -> Self {
self.reachability_probe = Some(probe);
self
}
fn build_stun_endpoints(&self) -> Result<Vec<scp_transport::nat::StunEndpoint>, NodeError> {
use scp_transport::nat::StunEndpoint;
if let Some(ref override_url) = self.stun_server {
let addr: SocketAddr = override_url.parse().map_err(|e| {
NodeError::Nat(format!("invalid STUN server address '{override_url}': {e}"))
})?;
Ok(vec![StunEndpoint {
addr,
label: override_url.clone(),
}])
} else {
Ok(DEFAULT_STUN_ENDPOINTS
.iter()
.map(|(addr_str, label)| {
#[allow(clippy::expect_used)]
let addr: SocketAddr = addr_str
.parse()
.expect("DEFAULT_STUN_ENDPOINTS contains valid SocketAddr literals");
StunEndpoint {
addr,
label: (*label).to_owned(),
}
})
.collect())
}
}
async fn try_tier1_upnp(
&self,
relay_port: u16,
socket: &tokio::net::UdpSocket,
probe: &dyn scp_transport::nat::ReachabilityProbe,
) -> Option<ReachabilityTier> {
let mapper = self.port_mapper.as_ref()?;
tracing::info!("attempting Tier 1 UPnP/NAT-PMP port mapping");
match mapper.map_port(relay_port).await {
Ok(mapping) => {
tracing::info!(
protocol = %mapping.protocol,
external_addr = %mapping.external_addr,
"UPnP port mapping acquired, running reachability self-test"
);
let reachable = probe
.probe_reachability(socket, mapping.external_addr)
.await
.unwrap_or(false);
if reachable {
tracing::info!(
external_addr = %mapping.external_addr,
"Tier 1 reachability self-test passed"
);
return Some(ReachabilityTier::Upnp {
external_addr: mapping.external_addr,
});
}
tracing::warn!("Tier 1 reachability self-test failed, falling through to Tier 2");
None
}
Err(e) => {
tracing::warn!(
error = %e,
"UPnP port mapping failed, falling through to Tier 2"
);
None
}
}
}
}
impl NatStrategy for DefaultNatStrategy {
fn select_tier(
&self,
relay_port: u16,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
> {
Box::pin(async move {
use scp_transport::nat::{DefaultReachabilityProbe, NatProber, ReachabilityProbe};
let endpoints = self.build_stun_endpoints()?;
let probe: Arc<dyn ReachabilityProbe> = if let Some(ref p) = self.reachability_probe {
Arc::clone(p)
} else {
Arc::new(DefaultReachabilityProbe::new(endpoints[0].addr, None))
};
let socket = tokio::net::UdpSocket::bind("0.0.0.0:0")
.await
.map_err(|e| {
NodeError::Nat(format!("failed to bind UDP socket for NAT probing: {e}"))
})?;
let prober = NatProber::new(endpoints, None)
.map_err(|e| NodeError::Nat(format!("failed to create NAT prober: {e}")))?;
let probe_result = prober
.probe_with_socket(&socket)
.await
.map_err(|e| NodeError::Nat(format!("NAT probing failed: {e}")))?;
tracing::info!(
nat_type = %probe_result.nat_type,
external_addr = ?probe_result.external_addr,
"NAT type probed"
);
if let Some(tier) = self.try_tier1_upnp(relay_port, &socket, &*probe).await {
return Ok(tier);
}
if probe_result.nat_type.is_hole_punchable()
&& let Some(external_addr) = probe_result.external_addr
{
tracing::info!(
external_addr = %external_addr,
"attempting Tier 2 STUN, running reachability self-test"
);
let reachable = probe
.probe_reachability(&socket, external_addr)
.await
.unwrap_or(false);
if reachable {
tracing::info!(
external_addr = %external_addr,
"Tier 2 reachability self-test passed"
);
return Ok(ReachabilityTier::Stun { external_addr });
}
tracing::warn!("Tier 2 reachability self-test failed, falling through to Tier 3");
}
if let Some(ref bridge_url) = self.bridge_relay {
return Ok(ReachabilityTier::Bridge {
bridge_url: bridge_url.clone(),
});
}
Err(NodeError::Nat(
"all reachability tiers failed: NAT is symmetric and no bridge relay configured"
.into(),
))
})
}
}
pub trait TlsProvider: Send + Sync {
fn provision(
&self,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<tls::CertificateData, tls::TlsError>>
+ Send
+ '_,
>,
>;
fn challenges(&self) -> Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>> {
Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()))
}
fn needs_challenge_listener(&self) -> bool {
false
}
}
impl<S: Storage + 'static> TlsProvider for tls::AcmeProvider<S> {
fn provision(
&self,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<tls::CertificateData, tls::TlsError>>
+ Send
+ '_,
>,
> {
Box::pin(self.load_or_provision())
}
fn challenges(&self) -> Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>> {
self.challenges()
}
fn needs_challenge_listener(&self) -> bool {
true
}
}
pub(crate) trait DidPublisher: Send + Sync {
fn publish<'a>(
&'a self,
identity: &'a ScpIdentity,
document: &'a DidDocument,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), IdentityError>> + Send + 'a>>;
}
struct DidMethodPublisher<D: DidMethod> {
inner: Arc<D>,
}
impl<D: DidMethod + 'static> DidPublisher for DidMethodPublisher<D> {
fn publish<'a>(
&'a self,
identity: &'a ScpIdentity,
document: &'a DidDocument,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), IdentityError>> + Send + 'a>>
{
Box::pin(self.inner.publish(identity, document))
}
}
const TIER_REEVALUATION_INTERVAL: Duration = Duration::from_secs(30 * 60);
struct TierReEvalHandle {
task: tokio::task::JoinHandle<()>,
cancel_tx: tokio::sync::watch::Sender<bool>,
}
impl TierReEvalHandle {
fn stop(&self) {
let _ = self.cancel_tx.send(true);
}
}
impl Drop for TierReEvalHandle {
fn drop(&mut self) {
if self.cancel_tx.send(true).is_err() {
self.task.abort();
}
}
}
fn tier_to_relay_url(tier: &ReachabilityTier) -> String {
match tier {
ReachabilityTier::Upnp { external_addr } | ReachabilityTier::Stun { external_addr } => {
format!("ws://{external_addr}/scp/v1")
}
ReachabilityTier::Bridge { bridge_url } => bridge_url.clone(),
}
}
async fn apply_tier_change(
current_url: &str,
new_relay_url: &str,
trigger_reason: &str,
current_doc: &DidDocument,
publisher: &dyn DidPublisher,
identity: &ScpIdentity,
event_tx: Option<&tokio::sync::mpsc::Sender<NatTierChange>>,
) -> Option<(String, DidDocument)> {
let mut updated_doc = current_doc.clone();
for svc in &mut updated_doc.service {
if svc.service_type == "SCPRelay" && svc.service_endpoint == current_url {
new_relay_url.clone_into(&mut svc.service_endpoint);
}
}
match publisher.publish(identity, &updated_doc).await {
Ok(()) => {
if let Some(tx) = event_tx {
let _ = tx
.send(NatTierChange::TierChanged {
previous_relay_url: current_url.to_owned(),
new_relay_url: new_relay_url.to_owned(),
reason: trigger_reason.to_owned(),
})
.await;
}
tracing::info!(new_url = %new_relay_url, did = %identity.did,
"DID document republished with new relay URL");
Some((new_relay_url.to_owned(), updated_doc))
}
Err(e) => {
tracing::warn!(error = %e, "DID document republish failed after tier change");
None
}
}
}
#[allow(clippy::too_many_arguments)]
fn spawn_tier_reevaluation(
nat_strategy: Arc<dyn NatStrategy>,
network_detector: Option<Arc<dyn NetworkChangeDetector>>,
publisher: Arc<dyn DidPublisher>,
identity: ScpIdentity,
document: DidDocument,
relay_port: u16,
current_relay_url: String,
event_tx: Option<tokio::sync::mpsc::Sender<NatTierChange>>,
reevaluation_interval: Duration,
) -> TierReEvalHandle {
let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
let task = tokio::spawn(async move {
let mut current_url = current_relay_url;
let mut current_doc = document;
loop {
let trigger_reason = tokio::select! {
() = tokio::time::sleep(reevaluation_interval) => {
"periodic 30-minute re-evaluation (§10.12.1)"
}
result = async {
match network_detector.as_ref() {
Some(d) => d.wait_for_change().await,
None => std::future::pending().await,
}
} => {
match result {
Ok(()) => "network change event detected",
Err(e) => {
tracing::warn!(error = %e, "network change detector error");
continue;
}
}
}
result = cancel_rx.changed() => {
if result.is_err() || *cancel_rx.borrow() { return; }
continue;
}
};
tracing::debug!(reason = trigger_reason, "tier re-evaluation triggered");
let new_tier = match nat_strategy.select_tier(relay_port).await {
Ok(tier) => tier,
Err(e) => {
tracing::warn!(error = %e, "tier re-evaluation failed, keeping current tier");
continue;
}
};
let new_relay_url = tier_to_relay_url(&new_tier);
if new_relay_url == current_url {
tracing::debug!(relay_url = %current_url, "tier re-evaluation: no change");
continue;
}
tracing::info!(
previous_url = %current_url, new_url = %new_relay_url,
tier = ?new_tier, reason = trigger_reason,
"reachability tier changed, updating DID document (§10.12.1)"
);
if let Some((url, doc)) = apply_tier_change(
¤t_url,
&new_relay_url,
trigger_reason,
¤t_doc,
&*publisher,
&identity,
event_tx.as_ref(),
)
.await
{
current_url = url;
current_doc = doc;
}
}
});
TierReEvalHandle { task, cancel_tx }
}
pub struct ApplicationNodeBuilder<
K: KeyCustody = NoOpCustody,
D: DidMethod = NoOpDidMethod,
S: Storage = NoOpStorage,
Dom = NoDomain,
Id = NoIdentity,
> {
domain: Option<String>,
identity_source: Option<IdentitySource<K, D>>,
storage: Option<S>,
blob_storage: Option<BlobStorageBackend>,
bind_addr: Option<SocketAddr>,
acme_email: Option<String>,
stun_server: Option<String>,
bridge_relay: Option<String>,
nat_strategy: Option<Arc<dyn NatStrategy>>,
port_mapper: Option<Arc<dyn scp_transport::nat::PortMapper>>,
reachability_probe: Option<Arc<dyn scp_transport::nat::ReachabilityProbe>>,
tls_provider: Option<Arc<dyn TlsProvider>>,
network_detector: Option<Arc<dyn NetworkChangeDetector>>,
local_api_addr: Option<SocketAddr>,
http_bind_addr: Option<SocketAddr>,
cors_origins: Option<Vec<String>>,
projection_rate_limit: Option<u32>,
#[cfg(feature = "http3")]
http3_config: Option<scp_transport::http3::Http3Config>,
persist_identity: bool,
_domain_state: PhantomData<Dom>,
_identity_state: PhantomData<Id>,
}
impl ApplicationNodeBuilder {
#[must_use]
pub fn new() -> Self {
Self {
domain: None,
identity_source: None,
storage: None,
blob_storage: Some(BlobStorageBackend::default()),
bind_addr: None,
acme_email: None,
stun_server: None,
bridge_relay: None,
nat_strategy: None,
port_mapper: None,
reachability_probe: None,
tls_provider: None,
network_detector: None,
local_api_addr: None,
http_bind_addr: None,
cors_origins: None,
projection_rate_limit: None,
#[cfg(feature = "http3")]
http3_config: None,
persist_identity: false,
_domain_state: PhantomData,
_identity_state: PhantomData,
}
}
}
impl Default
for ApplicationNodeBuilder<NoOpCustody, NoOpDidMethod, NoOpStorage, NoDomain, NoIdentity>
{
fn default() -> Self {
Self::new()
}
}
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static, Id>
ApplicationNodeBuilder<K, D, S, NoDomain, Id>
{
#[must_use]
pub fn domain(self, domain: &str) -> ApplicationNodeBuilder<K, D, S, HasDomain, Id> {
ApplicationNodeBuilder {
domain: Some(domain.to_owned()),
identity_source: self.identity_source,
storage: self.storage,
blob_storage: self.blob_storage,
bind_addr: self.bind_addr,
acme_email: self.acme_email,
stun_server: self.stun_server,
bridge_relay: self.bridge_relay,
nat_strategy: self.nat_strategy,
port_mapper: self.port_mapper,
reachability_probe: self.reachability_probe,
tls_provider: self.tls_provider,
network_detector: self.network_detector,
local_api_addr: self.local_api_addr,
http_bind_addr: self.http_bind_addr,
cors_origins: self.cors_origins,
projection_rate_limit: self.projection_rate_limit,
#[cfg(feature = "http3")]
http3_config: self.http3_config,
persist_identity: self.persist_identity,
_domain_state: PhantomData,
_identity_state: PhantomData,
}
}
#[must_use]
pub fn no_domain(self) -> ApplicationNodeBuilder<K, D, S, HasNoDomain, Id> {
ApplicationNodeBuilder {
domain: None,
identity_source: self.identity_source,
storage: self.storage,
blob_storage: self.blob_storage,
bind_addr: self.bind_addr,
acme_email: self.acme_email,
stun_server: self.stun_server,
bridge_relay: self.bridge_relay,
nat_strategy: self.nat_strategy,
port_mapper: self.port_mapper,
reachability_probe: self.reachability_probe,
tls_provider: self.tls_provider,
network_detector: self.network_detector,
local_api_addr: self.local_api_addr,
http_bind_addr: self.http_bind_addr,
cors_origins: self.cors_origins,
projection_rate_limit: self.projection_rate_limit,
#[cfg(feature = "http3")]
http3_config: self.http3_config,
persist_identity: self.persist_identity,
_domain_state: PhantomData,
_identity_state: PhantomData,
}
}
}
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static, Dom, Id>
ApplicationNodeBuilder<K, D, S, Dom, Id>
{
#[must_use]
pub const fn bind_addr(mut self, addr: SocketAddr) -> Self {
self.bind_addr = Some(addr);
self
}
#[must_use]
pub fn acme_email(mut self, email: &str) -> Self {
self.acme_email = Some(email.to_owned());
self
}
#[must_use]
pub fn stun_server(mut self, url: &str) -> Self {
self.stun_server = Some(url.to_owned());
self
}
#[must_use]
pub fn bridge_relay(mut self, url: &str) -> Self {
self.bridge_relay = Some(url.to_owned());
self
}
#[must_use]
pub fn nat_strategy(mut self, strategy: Arc<dyn NatStrategy>) -> Self {
self.nat_strategy = Some(strategy);
self
}
#[must_use]
pub fn port_mapper(mut self, mapper: Arc<dyn scp_transport::nat::PortMapper>) -> Self {
self.port_mapper = Some(mapper);
self
}
#[must_use]
pub fn reachability_probe(
mut self,
probe: Arc<dyn scp_transport::nat::ReachabilityProbe>,
) -> Self {
self.reachability_probe = Some(probe);
self
}
#[must_use]
pub fn tls_provider(mut self, provider: Arc<dyn TlsProvider>) -> Self {
self.tls_provider = Some(provider);
self
}
#[must_use]
pub fn network_detector(mut self, detector: Arc<dyn NetworkChangeDetector>) -> Self {
self.network_detector = Some(detector);
self
}
#[must_use]
pub fn local_api(mut self, addr: SocketAddr) -> Self {
assert!(
addr.ip().is_loopback(),
"dev API bind address must be loopback (127.0.0.1 or ::1), got {addr}"
);
self.local_api_addr = Some(addr);
self
}
#[must_use]
pub const fn http_bind_addr(mut self, addr: SocketAddr) -> Self {
self.http_bind_addr = Some(addr);
self
}
#[must_use]
pub fn cors_origins(mut self, origins: Vec<String>) -> Self {
self.cors_origins = if origins.is_empty() {
None
} else {
Some(origins)
};
self
}
#[must_use]
pub const fn projection_rate_limit(mut self, rate: u32) -> Self {
self.projection_rate_limit = Some(rate);
self
}
#[cfg(feature = "http3")]
#[must_use]
pub fn http3(mut self, config: scp_transport::http3::Http3Config) -> Self {
self.http3_config = Some(config);
self
}
}
impl<K: KeyCustody + 'static, D: DidMethod + 'static, Dom, Id>
ApplicationNodeBuilder<K, D, NoOpStorage, Dom, Id>
{
pub fn storage<S2: Storage + 'static>(
self,
storage: S2,
) -> ApplicationNodeBuilder<K, D, S2, Dom, Id> {
ApplicationNodeBuilder {
domain: self.domain,
identity_source: self.identity_source,
storage: Some(storage),
blob_storage: self.blob_storage,
bind_addr: self.bind_addr,
acme_email: self.acme_email,
stun_server: self.stun_server,
bridge_relay: self.bridge_relay,
nat_strategy: self.nat_strategy,
port_mapper: self.port_mapper,
reachability_probe: self.reachability_probe,
tls_provider: self.tls_provider,
network_detector: self.network_detector,
local_api_addr: self.local_api_addr,
http_bind_addr: self.http_bind_addr,
cors_origins: self.cors_origins,
projection_rate_limit: self.projection_rate_limit,
#[cfg(feature = "http3")]
http3_config: self.http3_config,
persist_identity: self.persist_identity,
_domain_state: PhantomData,
_identity_state: PhantomData,
}
}
}
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static, Dom, Id>
ApplicationNodeBuilder<K, D, S, Dom, Id>
{
#[must_use]
pub fn blob_storage(mut self, blob_storage: impl Into<BlobStorageBackend>) -> Self {
self.blob_storage = Some(blob_storage.into());
self
}
}
impl<S: Storage + 'static, Dom>
ApplicationNodeBuilder<NoOpCustody, NoOpDidMethod, S, Dom, NoIdentity>
{
pub fn identity<D2: DidMethod + 'static>(
self,
identity: ScpIdentity,
document: DidDocument,
did_method: Arc<D2>,
) -> ApplicationNodeBuilder<NoOpCustody, D2, S, Dom, HasIdentity> {
ApplicationNodeBuilder {
domain: self.domain,
identity_source: Some(IdentitySource::Explicit(Box::new(ExplicitIdentity {
identity,
document,
did_method,
}))),
storage: self.storage,
blob_storage: self.blob_storage,
bind_addr: self.bind_addr,
acme_email: self.acme_email,
stun_server: self.stun_server,
bridge_relay: self.bridge_relay,
nat_strategy: self.nat_strategy,
port_mapper: self.port_mapper,
reachability_probe: self.reachability_probe,
tls_provider: self.tls_provider,
network_detector: self.network_detector,
local_api_addr: self.local_api_addr,
http_bind_addr: self.http_bind_addr,
cors_origins: self.cors_origins,
projection_rate_limit: self.projection_rate_limit,
#[cfg(feature = "http3")]
http3_config: self.http3_config,
persist_identity: self.persist_identity,
_domain_state: PhantomData,
_identity_state: PhantomData,
}
}
pub fn generate_identity_with<K2: KeyCustody + 'static, D2: DidMethod + 'static>(
self,
key_custody: Arc<K2>,
did_method: Arc<D2>,
) -> ApplicationNodeBuilder<K2, D2, S, Dom, HasIdentity> {
ApplicationNodeBuilder {
domain: self.domain,
identity_source: Some(IdentitySource::Generate {
key_custody,
did_method,
}),
storage: self.storage,
blob_storage: self.blob_storage,
bind_addr: self.bind_addr,
acme_email: self.acme_email,
stun_server: self.stun_server,
bridge_relay: self.bridge_relay,
nat_strategy: self.nat_strategy,
port_mapper: self.port_mapper,
reachability_probe: self.reachability_probe,
tls_provider: self.tls_provider,
network_detector: self.network_detector,
local_api_addr: self.local_api_addr,
http_bind_addr: self.http_bind_addr,
cors_origins: self.cors_origins,
projection_rate_limit: self.projection_rate_limit,
#[cfg(feature = "http3")]
http3_config: self.http3_config,
persist_identity: self.persist_identity,
_domain_state: PhantomData,
_identity_state: PhantomData,
}
}
pub fn identity_with_storage<K2: KeyCustody + 'static, D2: DidMethod + 'static>(
self,
key_custody: Arc<K2>,
did_method: Arc<D2>,
) -> ApplicationNodeBuilder<K2, D2, S, Dom, HasIdentity> {
ApplicationNodeBuilder {
domain: self.domain,
identity_source: Some(IdentitySource::Generate {
key_custody,
did_method,
}),
storage: self.storage,
blob_storage: self.blob_storage,
bind_addr: self.bind_addr,
acme_email: self.acme_email,
stun_server: self.stun_server,
bridge_relay: self.bridge_relay,
nat_strategy: self.nat_strategy,
port_mapper: self.port_mapper,
reachability_probe: self.reachability_probe,
tls_provider: self.tls_provider,
network_detector: self.network_detector,
local_api_addr: self.local_api_addr,
http_bind_addr: self.http_bind_addr,
cors_origins: self.cors_origins,
projection_rate_limit: self.projection_rate_limit,
#[cfg(feature = "http3")]
http3_config: self.http3_config,
persist_identity: true,
_domain_state: PhantomData,
_identity_state: PhantomData,
}
}
}
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: EncryptedStorage + 'static>
ApplicationNodeBuilder<K, D, S, HasDomain, HasIdentity>
{
pub async fn build(mut self) -> Result<ApplicationNode<S>, NodeError> {
let storage = self
.storage
.take()
.ok_or(NodeError::MissingField("storage"))?;
let protocol_store = Arc::new(ProtocolStore::new(storage));
self.build_with_store(protocol_store).await
}
}
#[cfg(any(test, feature = "allow_unencrypted_storage"))]
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static>
ApplicationNodeBuilder<K, D, S, HasDomain, HasIdentity>
{
pub async fn build_for_testing(mut self) -> Result<ApplicationNode<S>, NodeError> {
let storage = self
.storage
.take()
.ok_or(NodeError::MissingField("storage"))?;
let protocol_store = Arc::new(ProtocolStore::new_for_testing(storage));
self.build_with_store(protocol_store).await
}
}
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static>
ApplicationNodeBuilder<K, D, S, HasDomain, HasIdentity>
{
#[allow(clippy::too_many_lines)] async fn build_with_store(
self,
protocol_store: Arc<ProtocolStore<S>>,
) -> Result<ApplicationNode<S>, NodeError> {
let domain = self.domain.ok_or(NodeError::MissingField("domain"))?;
let identity_source = self
.identity_source
.ok_or(NodeError::MissingField("identity"))?;
let persist = self.persist_identity;
let (identity, document, did_method) =
resolve_identity_persistent(identity_source, persist, protocol_store.storage()).await?;
let bridge_secret = generate_bridge_secret();
let bind_addr = self
.bind_addr
.unwrap_or_else(|| SocketAddr::from(([127, 0, 0, 1], 0)));
let relay_config = RelayConfig {
bind_addr,
bridge_secret: Some(*bridge_secret),
..RelayConfig::default()
};
let blob_storage = Arc::new(
self.blob_storage
.ok_or(NodeError::MissingField("blob_storage"))?,
);
let relay_server = RelayServer::new(relay_config.clone(), Arc::clone(&blob_storage));
let connection_tracker = relay_server.connection_tracker();
let subscription_registry = relay_server.subscriptions();
let (shutdown_handle, bound_addr) = relay_server.start().await?;
let dev_token = self.local_api_addr.map(generate_dev_token);
let http_bind_addr = self.http_bind_addr.unwrap_or(DEFAULT_HTTP_BIND_ADDR);
let tls_provider = resolve_tls(
self.tls_provider,
&domain,
&protocol_store,
self.acme_email.as_ref(),
);
let (provision_result, acme_challenges) =
provision_with_challenge_listener(&*tls_provider).await?;
let rate_limit = self
.projection_rate_limit
.unwrap_or(DEFAULT_PROJECTION_RATE_LIMIT);
match provision_result {
Ok(cert_data) => {
build_domain_inner(
domain,
identity,
document,
did_method,
protocol_store,
shutdown_handle,
bound_addr,
bridge_secret,
dev_token,
self.local_api_addr,
blob_storage,
relay_config,
http_bind_addr,
self.cors_origins.clone(),
rate_limit,
cert_data,
connection_tracker.clone(),
subscription_registry.clone(),
acme_challenges,
#[cfg(feature = "http3")]
self.http3_config,
)
.await
}
Err(tls_err) => {
tracing::warn!(
domain = %domain, error = %tls_err,
"TLS provisioning failed, falling through to NAT-traversed mode (§10.12.8)"
);
let strategy = resolve_nat(
self.nat_strategy,
self.stun_server,
self.bridge_relay,
self.port_mapper,
self.reachability_probe,
);
build_no_domain_inner(
identity,
document,
did_method,
protocol_store,
shutdown_handle,
bound_addr,
strategy,
bridge_secret,
dev_token,
self.local_api_addr,
blob_storage,
relay_config,
Some(http_bind_addr),
self.cors_origins,
rate_limit,
self.network_detector,
connection_tracker,
subscription_registry,
)
.await
}
}
}
}
async fn resolve_identity<K: KeyCustody, D: DidMethod>(
source: IdentitySource<K, D>,
) -> Result<(ScpIdentity, DidDocument, Arc<D>), NodeError> {
match source {
IdentitySource::Generate {
key_custody,
did_method,
} => {
let (identity, document) = did_method.create(&*key_custody).await?;
Ok((identity, document, did_method))
}
IdentitySource::Explicit(e) => Ok((e.identity, e.document, e.did_method)),
}
}
async fn validate_persisted_custody<K: KeyCustody>(
persisted: &PersistedIdentity,
key_custody: &K,
) -> Result<(), NodeError> {
let identity_pub = key_custody
.public_key(&persisted.identity.identity_key)
.await
.map_err(|e| {
NodeError::Storage(format!(
"persisted identity key handle not found in custody: {e}"
))
})?;
verify_vm_match(&persisted.document, "#0", &identity_pub, "identity key")?;
let active_pub = key_custody
.public_key(&persisted.identity.active_signing_key)
.await
.map_err(|e| {
NodeError::Storage(format!(
"persisted active signing key handle not found in custody: {e}"
))
})?;
verify_vm_match(
&persisted.document,
"#active",
&active_pub,
"active signing key",
)?;
if let Some(ref agent_key) = persisted.identity.agent_signing_key {
let agent_pub = key_custody.public_key(agent_key).await.map_err(|e| {
NodeError::Storage(format!(
"persisted agent signing key handle not found in custody: {e}"
))
})?;
verify_vm_match(
&persisted.document,
"#agent",
&agent_pub,
"agent signing key",
)?;
}
Ok(())
}
fn verify_vm_match(
document: &DidDocument,
vm_suffix: &str,
public_key: &scp_platform::traits::PublicKey,
label: &str,
) -> Result<(), NodeError> {
if let Some(vm) = document
.verification_method
.iter()
.find(|vm| vm.id.ends_with(vm_suffix))
{
let expected_multibase = format!("z{}", bs58::encode(public_key.as_bytes()).into_string());
if vm.public_key_multibase != expected_multibase {
return Err(NodeError::Storage(format!(
"custody {label} does not match DID document {vm_suffix} verification method \
(custody: {expected_multibase}, document: {})",
vm.public_key_multibase
)));
}
}
Ok(())
}
async fn resolve_identity_persistent<K: KeyCustody, D: DidMethod, S: Storage>(
source: IdentitySource<K, D>,
persist: bool,
storage: &S,
) -> Result<(ScpIdentity, DidDocument, Arc<D>), NodeError> {
if !persist {
return resolve_identity(source).await;
}
match source {
IdentitySource::Generate {
key_custody,
did_method,
} => {
let existing = storage.retrieve(IDENTITY_STORAGE_KEY).await.map_err(|e| {
NodeError::Storage(format!("failed to read persisted identity: {e}"))
})?;
if let Some(bytes) = existing {
let envelope: StoredValue<PersistedIdentity> = rmp_serde::from_slice(&bytes)
.map_err(|e| {
NodeError::Storage(format!("failed to deserialize persisted identity: {e}"))
})?;
if envelope.version > CURRENT_STORE_VERSION {
return Err(NodeError::Storage(format!(
"persisted identity version {} is newer than supported version {}; \
upgrade the binary or delete the stored identity",
envelope.version, CURRENT_STORE_VERSION
)));
}
let persisted = envelope.data;
validate_persisted_custody(&persisted, &*key_custody).await?;
tracing::info!(
did = %persisted.identity.did,
"reloaded persisted identity from storage"
);
Ok((persisted.identity, persisted.document, did_method))
} else {
let (identity, document) = did_method.create(&*key_custody).await?;
let persisted = PersistedIdentity {
identity: identity.clone(),
document: document.clone(),
};
let envelope = StoredValue {
version: CURRENT_STORE_VERSION,
data: &persisted,
};
let bytes = rmp_serde::to_vec_named(&envelope).map_err(|e| {
NodeError::Storage(format!("failed to serialize identity for persistence: {e}"))
})?;
storage
.store(IDENTITY_STORAGE_KEY, &bytes)
.await
.map_err(|e| {
NodeError::Storage(format!("failed to persist identity to storage: {e}"))
})?;
tracing::info!(
did = %identity.did,
"created and persisted new identity to storage"
);
Ok((identity, document, did_method))
}
}
IdentitySource::Explicit(e) => Ok((e.identity, e.document, e.did_method)),
}
}
fn generate_bridge_secret() -> Zeroizing<[u8; 32]> {
let mut bytes = [0u8; 32];
rand::RngCore::fill_bytes(&mut rand::rngs::OsRng, &mut bytes);
Zeroizing::new(bytes)
}
fn generate_dev_token(addr: SocketAddr) -> String {
use rand::RngCore;
let mut bytes = [0u8; 16];
rand::rngs::OsRng.fill_bytes(&mut bytes);
let hex = hex::encode(bytes);
let token = format!("scp_local_token_{hex}");
let masked = &token[..("scp_local_token_".len() + 8)];
tracing::info!(
token_prefix = %masked,
dev_bind_addr = ?addr,
"dev API token generated (use node.dev_token() for full value)"
);
token
}
fn resolve_tls<S: Storage + 'static>(
provider: Option<Arc<dyn TlsProvider>>,
domain: &str,
storage: &Arc<ProtocolStore<S>>,
acme_email: Option<&String>,
) -> Arc<dyn TlsProvider> {
provider.unwrap_or_else(|| {
let mut acme = tls::AcmeProvider::new(domain, Arc::clone(storage));
if let Some(email) = acme_email {
acme = acme.with_email(email);
}
Arc::new(acme)
})
}
struct AcmeChallengeListener {
shutdown: CancellationToken,
task: tokio::task::JoinHandle<Result<(), NodeError>>,
}
impl AcmeChallengeListener {
async fn stop(self) {
self.shutdown.cancel();
let _ = self.task.await;
tracing::info!("temporary ACME HTTP-01 challenge listener stopped");
}
}
async fn start_acme_challenge_listener(
challenges: Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>>,
) -> Result<AcmeChallengeListener, NodeError> {
let router = tls::acme_challenge_router(challenges);
let shutdown = CancellationToken::new();
let listener = tokio::net::TcpListener::bind("0.0.0.0:80")
.await
.map_err(|e| {
NodeError::Serve(format!(
"failed to bind temporary ACME challenge listener on port 80: {e}"
))
})?;
let local_addr = listener
.local_addr()
.map_err(|e| NodeError::Serve(e.to_string()))?;
tracing::info!(
addr = %local_addr,
"temporary ACME HTTP-01 challenge listener started"
);
let shutdown_clone = shutdown.clone();
let task = tokio::spawn(async move {
axum::serve(listener, router)
.with_graceful_shutdown(shutdown_clone.cancelled_owned())
.await
.map_err(|e| NodeError::Serve(format!("ACME challenge listener error: {e}")))
});
Ok(AcmeChallengeListener { shutdown, task })
}
async fn provision_with_challenge_listener(
provider: &dyn TlsProvider,
) -> Result<
(
Result<tls::CertificateData, tls::TlsError>,
Option<Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>>>,
),
NodeError,
> {
let challenges = provider.challenges();
let acme_listener = if provider.needs_challenge_listener() {
Some(start_acme_challenge_listener(Arc::clone(&challenges)).await?)
} else {
None
};
let result = provider.provision().await;
if let Some(listener) = acme_listener {
listener.stop().await;
}
let acme_challenges = if provider.needs_challenge_listener() {
Some(challenges)
} else {
None
};
Ok((result, acme_challenges))
}
fn resolve_nat(
strategy: Option<Arc<dyn NatStrategy>>,
stun_server: Option<String>,
bridge_relay: Option<String>,
port_mapper: Option<Arc<dyn scp_transport::nat::PortMapper>>,
reachability_probe: Option<Arc<dyn scp_transport::nat::ReachabilityProbe>>,
) -> Arc<dyn NatStrategy> {
strategy.unwrap_or_else(|| {
let mut default = DefaultNatStrategy::new(stun_server, bridge_relay);
if let Some(mapper) = port_mapper {
default = default.with_port_mapper(mapper);
}
if let Some(probe) = reachability_probe {
default = default.with_reachability_probe(probe);
}
Arc::new(default)
})
}
#[allow(clippy::too_many_arguments)]
async fn build_domain_inner<D: DidMethod + 'static, S: Storage + 'static>(
domain: String,
identity: ScpIdentity,
mut document: DidDocument,
did_method: Arc<D>,
storage: Arc<ProtocolStore<S>>,
shutdown_handle: ShutdownHandle,
bound_addr: SocketAddr,
bridge_secret: Zeroizing<[u8; 32]>,
dev_token: Option<String>,
dev_bind_addr: Option<SocketAddr>,
blob_storage: Arc<BlobStorageBackend>,
relay_config: RelayConfig,
http_bind_addr: SocketAddr,
cors_origins: Option<Vec<String>>,
projection_rate_limit: u32,
cert_data: tls::CertificateData,
connection_tracker: scp_transport::relay::rate_limit::ConnectionTracker,
subscription_registry: scp_transport::relay::subscription::SubscriptionRegistry,
acme_challenges: Option<Arc<tokio::sync::RwLock<std::collections::HashMap<String, String>>>>,
#[cfg(feature = "http3")] http3_config: Option<scp_transport::http3::Http3Config>,
) -> Result<ApplicationNode<S>, NodeError> {
let relay_url = format!("wss://{domain}/scp/v1");
document.add_relay_service(&relay_url)?;
did_method.publish(&identity, &document).await?;
let (tls_server_config, cert_resolver) =
tls::build_reloadable_tls_config(&cert_data).map_err(NodeError::Tls)?;
tracing::info!(
domain = %domain, relay_url = %relay_url,
bound_addr = %bound_addr, did = %identity.did,
"application node started (domain mode, TLS active)"
);
let state = Arc::new(http::NodeState {
did: identity.did.clone(),
relay_url,
broadcast_contexts: tokio::sync::RwLock::new(HashMap::new()),
relay_addr: bound_addr,
bridge_secret,
dev_token,
dev_bind_addr,
projected_contexts: tokio::sync::RwLock::new(HashMap::new()),
blob_storage,
relay_config,
start_time: std::time::Instant::now(),
http_bind_addr,
shutdown_token: CancellationToken::new(),
cors_origins,
projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
projection_rate_limit,
),
tls_config: Some(Arc::new(tls_server_config)),
cert_resolver: Some(cert_resolver),
did_document: document.clone(),
connection_tracker,
subscription_registry,
acme_challenges,
bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
});
Ok(ApplicationNode {
domain: Some(domain),
relay: RelayHandle {
bound_addr,
shutdown_handle,
},
identity: IdentityHandle { identity, document },
storage,
state,
tier_reeval: None,
tier_change_rx: None,
#[cfg(feature = "http3")]
http3_config,
})
}
#[allow(clippy::too_many_arguments)]
async fn build_no_domain_inner<D: DidMethod + 'static, S: Storage + 'static>(
identity: ScpIdentity,
mut document: DidDocument,
did_method: Arc<D>,
storage: Arc<ProtocolStore<S>>,
shutdown_handle: ShutdownHandle,
bound_addr: SocketAddr,
nat_strategy: Arc<dyn NatStrategy>,
bridge_secret: Zeroizing<[u8; 32]>,
dev_token: Option<String>,
dev_bind_addr: Option<SocketAddr>,
blob_storage: Arc<BlobStorageBackend>,
relay_config: RelayConfig,
http_bind_addr: Option<SocketAddr>,
cors_origins: Option<Vec<String>>,
projection_rate_limit: u32,
network_detector: Option<Arc<dyn NetworkChangeDetector>>,
connection_tracker: scp_transport::relay::rate_limit::ConnectionTracker,
subscription_registry: scp_transport::relay::subscription::SubscriptionRegistry,
) -> Result<ApplicationNode<S>, NodeError> {
let http_bind_addr = http_bind_addr.unwrap_or(DEFAULT_HTTP_BIND_ADDR);
let tier = nat_strategy.select_tier(http_bind_addr.port()).await?;
let relay_url = match &tier {
ReachabilityTier::Upnp { external_addr } | ReachabilityTier::Stun { external_addr } => {
format!("ws://{external_addr}/scp/v1")
}
ReachabilityTier::Bridge { bridge_url } => bridge_url.clone(),
};
let relay_count = document
.service
.iter()
.filter(|s| s.service_type == "SCPRelay")
.count();
document.service.push(scp_identity::document::Service {
id: format!("{}#scp-relay-{}", document.id, relay_count + 1),
service_type: "SCPRelay".to_owned(),
service_endpoint: relay_url.clone(),
});
did_method.publish(&identity, &document).await?;
tracing::info!(
tier = ?tier,
relay_url = %relay_url,
bound_addr = %bound_addr,
did = %identity.did,
"application node started (no-domain mode, §10.12.8)"
);
let publisher: Arc<dyn DidPublisher> = Arc::new(DidMethodPublisher {
inner: Arc::clone(&did_method),
});
let (tier_event_tx, tier_event_rx) = tokio::sync::mpsc::channel(16);
let bg_identity = ScpIdentity {
identity_key: identity.identity_key,
active_signing_key: identity.active_signing_key,
agent_signing_key: identity.agent_signing_key,
pre_rotation_commitment: identity.pre_rotation_commitment,
did: identity.did.clone(),
};
let tier_reeval = spawn_tier_reevaluation(
nat_strategy,
network_detector,
publisher,
bg_identity,
document.clone(),
http_bind_addr.port(),
relay_url.clone(),
Some(tier_event_tx),
TIER_REEVALUATION_INTERVAL,
);
let state = Arc::new(http::NodeState {
did: identity.did.clone(),
relay_url,
broadcast_contexts: tokio::sync::RwLock::new(HashMap::new()),
relay_addr: bound_addr,
bridge_secret,
dev_token,
dev_bind_addr,
projected_contexts: tokio::sync::RwLock::new(HashMap::new()),
blob_storage,
relay_config,
start_time: std::time::Instant::now(),
http_bind_addr,
shutdown_token: CancellationToken::new(),
cors_origins,
projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
projection_rate_limit,
),
tls_config: None,
cert_resolver: None,
did_document: document.clone(),
connection_tracker,
subscription_registry,
acme_challenges: None,
bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
});
Ok(ApplicationNode {
domain: None,
relay: RelayHandle {
bound_addr,
shutdown_handle,
},
identity: IdentityHandle { identity, document },
storage,
state,
tier_reeval: Some(tier_reeval),
tier_change_rx: Some(tier_event_rx),
#[cfg(feature = "http3")]
http3_config: None,
})
}
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: EncryptedStorage + 'static>
ApplicationNodeBuilder<K, D, S, HasNoDomain, HasIdentity>
{
pub async fn build(mut self) -> Result<ApplicationNode<S>, NodeError> {
let storage = self
.storage
.take()
.ok_or(NodeError::MissingField("storage"))?;
let protocol_store = Arc::new(ProtocolStore::new(storage));
self.build_with_store(protocol_store).await
}
}
#[cfg(any(test, feature = "allow_unencrypted_storage"))]
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static>
ApplicationNodeBuilder<K, D, S, HasNoDomain, HasIdentity>
{
pub async fn build_for_testing(mut self) -> Result<ApplicationNode<S>, NodeError> {
let storage = self
.storage
.take()
.ok_or(NodeError::MissingField("storage"))?;
let protocol_store = Arc::new(ProtocolStore::new_for_testing(storage));
self.build_with_store(protocol_store).await
}
}
impl<K: KeyCustody + 'static, D: DidMethod + 'static, S: Storage + 'static>
ApplicationNodeBuilder<K, D, S, HasNoDomain, HasIdentity>
{
async fn build_with_store(
self,
protocol_store: Arc<ProtocolStore<S>>,
) -> Result<ApplicationNode<S>, NodeError> {
let identity_source = self
.identity_source
.ok_or(NodeError::MissingField("identity"))?;
let persist = self.persist_identity;
let (identity, document, did_method) =
resolve_identity_persistent(identity_source, persist, protocol_store.storage()).await?;
let bind_addr = self
.bind_addr
.unwrap_or_else(|| SocketAddr::from(([127, 0, 0, 1], 0)));
let bridge_secret = generate_bridge_secret();
let relay_config = RelayConfig {
bind_addr,
bridge_secret: Some(*bridge_secret),
..RelayConfig::default()
};
let blob_storage = Arc::new(
self.blob_storage
.ok_or(NodeError::MissingField("blob_storage"))?,
);
let relay_server = RelayServer::new(relay_config.clone(), Arc::clone(&blob_storage));
let connection_tracker = relay_server.connection_tracker();
let subscription_registry = relay_server.subscriptions();
let (shutdown_handle, bound_addr) = relay_server.start().await?;
let dev_token = self.local_api_addr.map(generate_dev_token);
let strategy = resolve_nat(
self.nat_strategy,
self.stun_server,
self.bridge_relay,
self.port_mapper,
self.reachability_probe,
);
build_no_domain_inner(
identity,
document,
did_method,
protocol_store,
shutdown_handle,
bound_addr,
strategy,
bridge_secret,
dev_token,
self.local_api_addr,
blob_storage,
relay_config,
self.http_bind_addr,
self.cors_origins,
self.projection_rate_limit
.unwrap_or(DEFAULT_PROJECTION_RATE_LIMIT),
self.network_detector,
connection_tracker,
subscription_registry,
)
.await
}
}
#[doc(hidden)]
pub struct NoOpCustody;
impl KeyCustody for NoOpCustody {
fn generate_keypair(
&self,
_key_type: scp_platform::KeyType,
) -> impl std::future::Future<
Output = Result<scp_platform::KeyHandle, scp_platform::PlatformError>,
> + Send {
std::future::ready(Err(scp_platform::PlatformError::StorageError(
"NoOpCustody: not configured".to_owned(),
)))
}
fn public_key(
&self,
_handle: &scp_platform::KeyHandle,
) -> impl std::future::Future<
Output = Result<scp_platform::PublicKey, scp_platform::PlatformError>,
> + Send {
std::future::ready(Err(scp_platform::PlatformError::StorageError(
"NoOpCustody: not configured".to_owned(),
)))
}
fn sign(
&self,
_handle: &scp_platform::KeyHandle,
_data: &[u8],
) -> impl std::future::Future<
Output = Result<scp_platform::Signature, scp_platform::PlatformError>,
> + Send {
std::future::ready(Err(scp_platform::PlatformError::StorageError(
"NoOpCustody: not configured".to_owned(),
)))
}
fn destroy_key(
&self,
_handle: &scp_platform::KeyHandle,
) -> impl std::future::Future<Output = Result<(), scp_platform::PlatformError>> + Send {
std::future::ready(Err(scp_platform::PlatformError::StorageError(
"NoOpCustody: not configured".to_owned(),
)))
}
fn dh_agree(
&self,
_handle: &scp_platform::KeyHandle,
_peer_public: &[u8; 32],
) -> impl std::future::Future<
Output = Result<scp_platform::SharedSecret, scp_platform::PlatformError>,
> + Send {
std::future::ready(Err(scp_platform::PlatformError::StorageError(
"NoOpCustody: not configured".to_owned(),
)))
}
fn derive_pseudonym(
&self,
_handle: &scp_platform::KeyHandle,
_context_id: &[u8],
) -> impl std::future::Future<
Output = Result<scp_platform::PseudonymKeypair, scp_platform::PlatformError>,
> + Send {
std::future::ready(Err(scp_platform::PlatformError::StorageError(
"NoOpCustody: not configured".to_owned(),
)))
}
fn derive_rotatable_pseudonym(
&self,
_handle: &scp_platform::KeyHandle,
_context_id: &[u8],
_pseudonym_epoch: u64,
) -> impl std::future::Future<
Output = Result<scp_platform::PseudonymKeypair, scp_platform::PlatformError>,
> + Send {
std::future::ready(Err(scp_platform::PlatformError::StorageError(
"NoOpCustody: not configured".to_owned(),
)))
}
fn custody_type(&self, _handle: &scp_platform::KeyHandle) -> scp_platform::CustodyType {
scp_platform::CustodyType::InMemory
}
}
#[doc(hidden)]
pub struct NoOpDidMethod;
impl DidMethod for NoOpDidMethod {
fn create(
&self,
_key_custody: &impl KeyCustody,
) -> impl std::future::Future<Output = Result<(ScpIdentity, DidDocument), IdentityError>> + Send
{
std::future::ready(Err(IdentityError::DhtPublishFailed(
"NoOpDidMethod: not configured".to_owned(),
)))
}
fn verify(&self, _did_string: &str, _public_key: &[u8]) -> bool {
false
}
fn publish(
&self,
_identity: &ScpIdentity,
_document: &DidDocument,
) -> impl std::future::Future<Output = Result<(), IdentityError>> + Send {
std::future::ready(Err(IdentityError::DhtPublishFailed(
"NoOpDidMethod: not configured".to_owned(),
)))
}
fn resolve(
&self,
_did_string: &str,
) -> impl std::future::Future<Output = Result<DidDocument, IdentityError>> + Send {
std::future::ready(Err(IdentityError::DhtResolveFailed(
"NoOpDidMethod: not configured".to_owned(),
)))
}
fn rotate(
&self,
_identity: &ScpIdentity,
_key_custody: &impl KeyCustody,
) -> impl std::future::Future<Output = Result<(ScpIdentity, DidDocument), IdentityError>> + Send
{
std::future::ready(Err(IdentityError::KeyRotationFailed(
"NoOpDidMethod: not configured".to_owned(),
)))
}
}
#[doc(hidden)]
#[derive(Debug, Default)]
pub struct NoOpStorage;
impl Storage for NoOpStorage {
fn store(
&self,
_key: &str,
_data: &[u8],
) -> impl std::future::Future<Output = Result<(), scp_platform::PlatformError>> + Send {
std::future::ready(Ok(()))
}
fn retrieve(
&self,
_key: &str,
) -> impl std::future::Future<Output = Result<Option<Vec<u8>>, scp_platform::PlatformError>> + Send
{
std::future::ready(Ok(None))
}
fn delete(
&self,
_key: &str,
) -> impl std::future::Future<Output = Result<(), scp_platform::PlatformError>> + Send {
std::future::ready(Ok(()))
}
fn list_keys(
&self,
_prefix: &str,
) -> impl std::future::Future<Output = Result<Vec<String>, scp_platform::PlatformError>> + Send
{
std::future::ready(Ok(Vec::new()))
}
fn delete_prefix(
&self,
_prefix: &str,
) -> impl std::future::Future<Output = Result<u64, scp_platform::PlatformError>> + Send {
std::future::ready(Ok(0))
}
fn exists(
&self,
_key: &str,
) -> impl std::future::Future<Output = Result<bool, scp_platform::PlatformError>> + Send {
std::future::ready(Ok(false))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use std::sync::Arc;
use scp_identity::DidCache;
use scp_identity::cache::SystemClock;
use scp_identity::dht::DidDht;
use scp_identity::dht_client::InMemoryDhtClient;
use scp_platform::testing::{InMemoryKeyCustody, InMemoryStorage};
type TestDidDht = DidDht<InMemoryDhtClient, SystemClock>;
fn make_test_dht(custody: &Arc<InMemoryKeyCustody>) -> TestDidDht {
let dht_client = Arc::new(InMemoryDhtClient::new());
let cache = Arc::new(DidCache::new());
let sign_fn = TestDidDht::make_sign_fn(Arc::clone(custody));
DidDht::with_client_and_signer(dht_client, cache, sign_fn)
}
struct SucceedingTlsProvider {
domain: String,
}
impl TlsProvider for SucceedingTlsProvider {
fn provision(
&self,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<tls::CertificateData, tls::TlsError>>
+ Send
+ '_,
>,
> {
let domain = self.domain.clone();
Box::pin(async move { tls::generate_self_signed(&domain) })
}
}
struct FailingTlsProvider;
impl TlsProvider for FailingTlsProvider {
fn provision(
&self,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<tls::CertificateData, tls::TlsError>>
+ Send
+ '_,
>,
> {
Box::pin(async {
Err(tls::TlsError::Acme(
"ACME challenge failed (mock)".to_owned(),
))
})
}
}
fn test_builder() -> ApplicationNodeBuilder<
InMemoryKeyCustody,
TestDidDht,
InMemoryStorage,
HasDomain,
HasIdentity,
> {
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
ApplicationNodeBuilder::new()
.storage(InMemoryStorage::new())
.domain("test.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "test.example.com".to_owned(),
}))
.generate_identity_with(custody, did_method)
}
async fn create_test_identity() -> (ScpIdentity, DidDocument, Arc<InMemoryKeyCustody>) {
let custody = Arc::new(InMemoryKeyCustody::new());
let did_dht = make_test_dht(&custody);
let (identity, document) = did_dht.create(&*custody).await.unwrap();
(identity, document, custody)
}
#[tokio::test]
async fn type_state_builder_compiles_with_all_required_fields() {
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let _builder = ApplicationNodeBuilder::new()
.domain("test.example.com")
.generate_identity_with(custody, did_method);
}
#[test]
fn type_state_optional_fields_at_any_point() {
let _builder = ApplicationNodeBuilder::new()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.acme_email("test@example.com");
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let _builder = ApplicationNodeBuilder::new()
.domain("test.example.com")
.generate_identity_with(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.acme_email("test@example.com");
}
#[tokio::test]
async fn build_with_generate_identity_creates_new_did() {
let node = test_builder()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.identity().did().starts_with("did:dht:"),
"DID should start with did:dht:, got: {}",
node.identity().did()
);
let relay_urls = node.identity().document().relay_service_urls();
assert_eq!(relay_urls.len(), 1);
assert_eq!(relay_urls[0], "wss://test.example.com/scp/v1");
assert_eq!(node.domain(), Some("test.example.com"));
assert_eq!(node.relay_url(), "wss://test.example.com/scp/v1");
let addr = node.relay().bound_addr();
assert_ne!(addr.port(), 0, "relay should be bound to a real port");
}
#[tokio::test]
async fn build_with_explicit_identity_uses_provided_identity() {
let (identity, document, custody) = create_test_identity().await;
let original_did = identity.did.clone();
let did_method = Arc::new(make_test_dht(&custody));
let node = ApplicationNodeBuilder::new()
.storage(InMemoryStorage::new())
.domain("explicit.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "explicit.example.com".to_owned(),
}))
.identity(identity, document, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert_eq!(node.identity().did(), original_did);
let relay_urls = node.identity().document().relay_service_urls();
assert!(
relay_urls.contains(&"wss://explicit.example.com/scp/v1".to_owned()),
"expected relay URL in document, got: {relay_urls:?}"
);
}
#[tokio::test]
async fn did_publication_happens_once_on_build() {
use std::sync::atomic::{AtomicU32, Ordering};
struct CountingDidMethod {
inner: TestDidDht,
publish_count: Arc<AtomicU32>,
}
impl DidMethod for CountingDidMethod {
fn create(
&self,
key_custody: &impl KeyCustody,
) -> impl std::future::Future<
Output = Result<(ScpIdentity, DidDocument), IdentityError>,
> + Send {
self.inner.create(key_custody)
}
fn verify(&self, did_string: &str, public_key: &[u8]) -> bool {
self.inner.verify(did_string, public_key)
}
fn publish(
&self,
identity: &ScpIdentity,
document: &DidDocument,
) -> impl std::future::Future<Output = Result<(), IdentityError>> + Send {
self.publish_count.fetch_add(1, Ordering::SeqCst);
self.inner.publish(identity, document)
}
fn resolve(
&self,
did_string: &str,
) -> impl std::future::Future<Output = Result<DidDocument, IdentityError>> + Send
{
self.inner.resolve(did_string)
}
fn rotate(
&self,
identity: &ScpIdentity,
key_custody: &impl KeyCustody,
) -> impl std::future::Future<
Output = Result<(ScpIdentity, DidDocument), IdentityError>,
> + Send {
self.inner.rotate(identity, key_custody)
}
}
let custody = Arc::new(InMemoryKeyCustody::new());
let publish_count = Arc::new(AtomicU32::new(0));
let counting_method = Arc::new(CountingDidMethod {
inner: make_test_dht(&custody),
publish_count: Arc::clone(&publish_count),
});
let _node = ApplicationNodeBuilder::new()
.storage(InMemoryStorage::new())
.domain("counting.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "counting.example.com".to_owned(),
}))
.generate_identity_with(custody, counting_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert_eq!(
publish_count.load(Ordering::SeqCst),
1,
"DID should be published exactly once on build"
);
}
#[tokio::test]
async fn relay_accepts_connections_with_valid_bridge_token() {
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
let node = test_builder()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
let addr = node.relay().bound_addr();
let token = node.bridge_token_hex();
let url = format!("ws://{addr}/");
let mut request = url.into_client_request().unwrap();
request
.headers_mut()
.insert("Authorization", format!("Bearer {token}").parse().unwrap());
let connect_result = tokio_tungstenite::connect_async(request).await;
assert!(
connect_result.is_ok(),
"relay should accept connections with valid bridge token, got error: {:?}",
connect_result.err()
);
}
#[tokio::test]
async fn relay_rejects_connections_without_bridge_token() {
let node = test_builder()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
let addr = node.relay().bound_addr();
let url = format!("ws://{addr}/");
let connect_result = tokio_tungstenite::connect_async(&url).await;
assert!(
connect_result.is_err(),
"relay should reject connections without bridge token"
);
}
#[tokio::test]
async fn relay_listening_before_did_publish() {
use std::sync::atomic::{AtomicBool, Ordering};
struct RelayCheckDidMethod {
inner: TestDidDht,
relay_was_listening_at_publish: Arc<AtomicBool>,
bind_addr: SocketAddr,
}
impl DidMethod for RelayCheckDidMethod {
fn create(
&self,
key_custody: &impl KeyCustody,
) -> impl std::future::Future<
Output = Result<(ScpIdentity, DidDocument), IdentityError>,
> + Send {
self.inner.create(key_custody)
}
fn verify(&self, did_string: &str, public_key: &[u8]) -> bool {
self.inner.verify(did_string, public_key)
}
fn publish(
&self,
identity: &ScpIdentity,
document: &DidDocument,
) -> impl std::future::Future<Output = Result<(), IdentityError>> + Send {
let addr = self.bind_addr;
let flag = Arc::clone(&self.relay_was_listening_at_publish);
let inner = &self.inner;
async move {
if tokio::net::TcpStream::connect(addr).await.is_ok() {
flag.store(true, Ordering::SeqCst);
}
inner.publish(identity, document).await
}
}
fn resolve(
&self,
did_string: &str,
) -> impl std::future::Future<Output = Result<DidDocument, IdentityError>> + Send
{
self.inner.resolve(did_string)
}
fn rotate(
&self,
identity: &ScpIdentity,
key_custody: &impl KeyCustody,
) -> impl std::future::Future<
Output = Result<(ScpIdentity, DidDocument), IdentityError>,
> + Send {
self.inner.rotate(identity, key_custody)
}
}
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let bind_addr = listener.local_addr().unwrap();
drop(listener);
let custody = Arc::new(InMemoryKeyCustody::new());
let relay_was_listening = Arc::new(AtomicBool::new(false));
let check_method = Arc::new(RelayCheckDidMethod {
inner: make_test_dht(&custody),
relay_was_listening_at_publish: Arc::clone(&relay_was_listening),
bind_addr,
});
let _node = ApplicationNodeBuilder::new()
.storage(InMemoryStorage::new())
.domain("relay-order.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "relay-order.example.com".to_owned(),
}))
.generate_identity_with(custody, check_method)
.bind_addr(bind_addr)
.build_for_testing()
.await
.unwrap();
assert!(
relay_was_listening.load(Ordering::SeqCst),
"relay must be listening BEFORE DID document is published"
);
}
#[tokio::test]
async fn builder_domain_sets_relay_url() {
let node = test_builder()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert_eq!(node.relay_url(), "wss://test.example.com/scp/v1");
}
#[tokio::test]
async fn builder_with_custom_storage() {
let custom_storage = InMemoryStorage::new();
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let node = ApplicationNodeBuilder::new()
.storage(custom_storage)
.domain("storage.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "storage.example.com".to_owned(),
}))
.generate_identity_with(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
let _storage = node.storage();
}
#[tokio::test]
async fn builder_with_acme_email() {
let node = test_builder()
.acme_email("admin@example.com")
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.identity().did().starts_with("did:dht:"),
"node should build successfully with acme_email set"
);
}
struct MockNatStrategy {
tier: ReachabilityTier,
}
impl NatStrategy for MockNatStrategy {
fn select_tier(
&self,
_relay_port: u16,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
> {
let tier = self.tier.clone();
Box::pin(async move { Ok(tier) })
}
}
struct FailingNatStrategy;
impl NatStrategy for FailingNatStrategy {
fn select_tier(
&self,
_relay_port: u16,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
> {
Box::pin(async { Err(NodeError::Nat("all tiers failed".into())) })
}
}
fn test_no_domain_builder(
tier: ReachabilityTier,
) -> ApplicationNodeBuilder<
InMemoryKeyCustody,
TestDidDht,
InMemoryStorage,
HasNoDomain,
HasIdentity,
> {
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
ApplicationNodeBuilder::new()
.storage(InMemoryStorage::new())
.no_domain()
.nat_strategy(Arc::new(MockNatStrategy { tier }))
.generate_identity_with(custody, did_method)
}
#[test]
fn no_domain_method_exists_and_transitions_type_state() {
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let _builder = ApplicationNodeBuilder::new()
.no_domain()
.generate_identity_with(custody, did_method);
}
#[test]
fn stun_server_method_exists_on_builder() {
let _builder = ApplicationNodeBuilder::new().stun_server("stun.example.com:3478");
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let _builder = ApplicationNodeBuilder::new()
.stun_server("stun.example.com:3478")
.no_domain()
.generate_identity_with(custody, did_method);
}
#[test]
fn bridge_relay_method_exists_on_builder() {
let _builder =
ApplicationNodeBuilder::new().bridge_relay("wss://bridge.example.com/scp/v1");
}
#[tokio::test]
async fn no_domain_build_skips_tls_and_publishes_ws_url() {
let external_addr = SocketAddr::from(([198, 51, 100, 7], 32891));
let tier = ReachabilityTier::Stun { external_addr };
let node = test_no_domain_builder(tier)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.domain().is_none(),
"no-domain mode should have None domain"
);
assert!(
node.relay_url().starts_with("ws://"),
"no-domain mode should publish ws:// URL, got: {}",
node.relay_url()
);
assert_eq!(node.relay_url(), "ws://198.51.100.7:32891/scp/v1");
let relay_urls = node.identity().document().relay_service_urls();
assert_eq!(relay_urls.len(), 1);
assert_eq!(relay_urls[0], "ws://198.51.100.7:32891/scp/v1");
assert!(
node.identity().did().starts_with("did:dht:"),
"DID should start with did:dht:"
);
assert_ne!(node.relay().bound_addr().port(), 0);
}
#[tokio::test]
async fn no_domain_build_with_bridge_publishes_wss_url() {
let tier = ReachabilityTier::Bridge {
bridge_url: "wss://bridge.example.com/scp/v1?bridge_target=deadbeef".to_owned(),
};
let node = test_no_domain_builder(tier)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.relay_url().starts_with("wss://"),
"bridge mode should publish wss:// URL, got: {}",
node.relay_url()
);
assert_eq!(
node.relay_url(),
"wss://bridge.example.com/scp/v1?bridge_target=deadbeef"
);
let relay_urls = node.identity().document().relay_service_urls();
assert_eq!(relay_urls.len(), 1);
assert!(relay_urls[0].contains("bridge_target="));
}
#[tokio::test]
async fn no_domain_build_with_upnp_tier_publishes_ws_url() {
let external_addr = SocketAddr::from(([203, 0, 113, 42], 8443));
let tier = ReachabilityTier::Upnp { external_addr };
let node = test_no_domain_builder(tier)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert_eq!(node.relay_url(), "ws://203.0.113.42:8443/scp/v1");
}
#[tokio::test]
async fn no_domain_does_not_serve_well_known() {
let tier = ReachabilityTier::Stun {
external_addr: SocketAddr::from(([198, 51, 100, 7], 32891)),
};
let node = test_no_domain_builder(tier)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.domain().is_none(),
"no-domain mode: domain must be None to prevent .well-known/scp serving"
);
}
#[tokio::test]
async fn domain_build_uses_wss_no_regression() {
let node = test_builder()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.relay_url().starts_with("wss://"),
"domain mode should use wss://, got: {}",
node.relay_url()
);
assert_eq!(node.relay_url(), "wss://test.example.com/scp/v1");
assert_eq!(node.domain(), Some("test.example.com"));
}
#[tokio::test]
async fn domain_fallthrough_on_acme_failure_probes_nat() {
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
struct RecordingNatStrategy {
called: Arc<AtomicBool>,
received_port: Arc<AtomicU16>,
tier: ReachabilityTier,
}
impl NatStrategy for RecordingNatStrategy {
fn select_tier(
&self,
relay_port: u16,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>>
+ Send
+ '_,
>,
> {
self.called.store(true, Ordering::SeqCst);
self.received_port.store(relay_port, Ordering::SeqCst);
let tier = self.tier.clone();
Box::pin(async move { Ok(tier) })
}
}
let nat_called = Arc::new(AtomicBool::new(false));
let nat_port = Arc::new(AtomicU16::new(0));
let external_addr = SocketAddr::from(([198, 51, 100, 7], 32891));
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let node = ApplicationNodeBuilder::new()
.storage(InMemoryStorage::new())
.domain("fail.example.com")
.tls_provider(Arc::new(FailingTlsProvider))
.nat_strategy(Arc::new(RecordingNatStrategy {
called: Arc::clone(&nat_called),
received_port: Arc::clone(&nat_port),
tier: ReachabilityTier::Stun { external_addr },
}))
.generate_identity_with(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.domain().is_none(),
"domain should be None after TLS fallthrough"
);
assert!(
nat_called.load(Ordering::SeqCst),
"NAT strategy should have been called on ACME failure fallthrough"
);
assert_eq!(
nat_port.load(Ordering::SeqCst),
DEFAULT_HTTP_BIND_ADDR.port(),
"NAT strategy should receive the HTTP port ({}), not the relay port",
DEFAULT_HTTP_BIND_ADDR.port()
);
assert!(
node.relay_url().starts_with("ws://"),
"fallthrough should use ws:// URL, got: {}",
node.relay_url()
);
assert_eq!(node.relay_url(), "ws://198.51.100.7:32891/scp/v1");
assert_ne!(
node.relay().bound_addr().port(),
0,
"relay should be bound to a real port after fallthrough"
);
assert!(
node.identity().did().starts_with("did:dht:"),
"DID should start with did:dht:"
);
}
#[tokio::test]
async fn no_domain_nat_failure_returns_error() {
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let result = ApplicationNodeBuilder::new()
.storage(InMemoryStorage::new())
.no_domain()
.nat_strategy(Arc::new(FailingNatStrategy))
.generate_identity_with(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await;
let Err(err) = result else {
panic!("build() should fail when all NAT tiers fail");
};
assert!(
matches!(err, NodeError::Nat(_)),
"error should be NodeError::Nat, got: {err:?}"
);
}
#[tokio::test]
async fn no_domain_did_publication_happens_once() {
use std::sync::atomic::{AtomicU32, Ordering};
struct CountingDidMethod {
inner: TestDidDht,
publish_count: Arc<AtomicU32>,
}
impl DidMethod for CountingDidMethod {
fn create(
&self,
key_custody: &impl KeyCustody,
) -> impl std::future::Future<
Output = Result<(ScpIdentity, DidDocument), IdentityError>,
> + Send {
self.inner.create(key_custody)
}
fn verify(&self, did_string: &str, public_key: &[u8]) -> bool {
self.inner.verify(did_string, public_key)
}
fn publish(
&self,
identity: &ScpIdentity,
document: &DidDocument,
) -> impl std::future::Future<Output = Result<(), IdentityError>> + Send {
self.publish_count.fetch_add(1, Ordering::SeqCst);
self.inner.publish(identity, document)
}
fn resolve(
&self,
did_string: &str,
) -> impl std::future::Future<Output = Result<DidDocument, IdentityError>> + Send
{
self.inner.resolve(did_string)
}
fn rotate(
&self,
identity: &ScpIdentity,
key_custody: &impl KeyCustody,
) -> impl std::future::Future<
Output = Result<(ScpIdentity, DidDocument), IdentityError>,
> + Send {
self.inner.rotate(identity, key_custody)
}
}
let custody = Arc::new(InMemoryKeyCustody::new());
let publish_count = Arc::new(AtomicU32::new(0));
let counting_method = Arc::new(CountingDidMethod {
inner: make_test_dht(&custody),
publish_count: Arc::clone(&publish_count),
});
let tier = ReachabilityTier::Stun {
external_addr: SocketAddr::from(([198, 51, 100, 7], 32891)),
};
let _node = ApplicationNodeBuilder::new()
.storage(InMemoryStorage::new())
.no_domain()
.nat_strategy(Arc::new(MockNatStrategy { tier }))
.generate_identity_with(custody, counting_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert_eq!(
publish_count.load(Ordering::SeqCst),
1,
"DID should be published exactly once on no-domain build"
);
}
#[test]
fn default_stun_endpoints_parseable() {
for (addr, _label) in DEFAULT_STUN_ENDPOINTS {
let parsed: std::net::SocketAddr = addr
.parse()
.unwrap_or_else(|e| panic!("STUN endpoint '{addr}' not parseable: {e}"));
assert_ne!(parsed.port(), 0);
}
}
fn build_stun_binding_response(addr: SocketAddr, transaction_id: &[u8; 12]) -> Vec<u8> {
const MAGIC_COOKIE: u32 = 0x2112_A442;
const BINDING_RESPONSE: u16 = 0x0101;
const ATTR_XOR_MAPPED_ADDRESS: u16 = 0x0020;
let mut attr_data = Vec::new();
attr_data.push(0x00); match addr {
SocketAddr::V4(v4) => {
attr_data.push(0x01); let xor_port = v4.port() ^ (MAGIC_COOKIE >> 16) as u16;
attr_data.extend_from_slice(&xor_port.to_be_bytes());
let ip_bits: u32 = (*v4.ip()).into();
let xor_ip = ip_bits ^ MAGIC_COOKIE;
attr_data.extend_from_slice(&xor_ip.to_be_bytes());
}
SocketAddr::V6(v6) => {
attr_data.push(0x02); let xor_port = v6.port() ^ (MAGIC_COOKIE >> 16) as u16;
attr_data.extend_from_slice(&xor_port.to_be_bytes());
let ip_bytes = v6.ip().octets();
let mut xor_key = [0u8; 16];
xor_key[0..4].copy_from_slice(&MAGIC_COOKIE.to_be_bytes());
xor_key[4..16].copy_from_slice(transaction_id);
for i in 0..16 {
attr_data.push(ip_bytes[i] ^ xor_key[i]);
}
}
}
#[allow(clippy::cast_possible_truncation)]
let attr_len = attr_data.len() as u16;
#[allow(clippy::cast_possible_truncation)]
let padded_attr_len = ((attr_data.len() + 3) & !3) as u16;
let msg_len = 4 + padded_attr_len;
let mut buf = Vec::with_capacity(20 + msg_len as usize);
buf.extend_from_slice(&BINDING_RESPONSE.to_be_bytes());
buf.extend_from_slice(&msg_len.to_be_bytes());
buf.extend_from_slice(&MAGIC_COOKIE.to_be_bytes());
buf.extend_from_slice(transaction_id);
buf.extend_from_slice(&ATTR_XOR_MAPPED_ADDRESS.to_be_bytes());
buf.extend_from_slice(&attr_len.to_be_bytes());
buf.extend_from_slice(&attr_data);
let padding = (4 - (attr_data.len() % 4)) % 4;
buf.extend(std::iter::repeat_n(0u8, padding));
buf
}
fn spawn_mock_stun_server(
socket: tokio::net::UdpSocket,
external_addr: SocketAddr,
count: usize,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
for _ in 0..count {
let mut buf = [0u8; 576];
let (_, from) = socket.recv_from(&mut buf).await.expect("recv");
let mut txn_id = [0u8; 12];
txn_id.copy_from_slice(&buf[8..20]);
let response = build_stun_binding_response(external_addr, &txn_id);
socket.send_to(&response, from).await.expect("send");
}
})
}
struct MockReachabilityProbe {
reachable: std::sync::atomic::AtomicBool,
}
impl MockReachabilityProbe {
fn new(reachable: bool) -> Self {
Self {
reachable: std::sync::atomic::AtomicBool::new(reachable),
}
}
}
impl scp_transport::nat::ReachabilityProbe for MockReachabilityProbe {
fn probe_reachability<'a>(
&'a self,
_socket: &'a tokio::net::UdpSocket,
_external_addr: SocketAddr,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<bool, scp_transport::TransportError>>
+ Send
+ 'a,
>,
> {
let reachable = self.reachable.load(std::sync::atomic::Ordering::Relaxed);
Box::pin(async move { Ok(reachable) })
}
}
struct MockPortMapper {
result: tokio::sync::Mutex<
Option<
Result<scp_transport::nat::PortMappingResult, scp_transport::nat::PortMappingError>,
>,
>,
}
impl MockPortMapper {
fn ok(addr: SocketAddr) -> Self {
Self {
result: tokio::sync::Mutex::new(Some(Ok(scp_transport::nat::PortMappingResult {
external_addr: addr,
ttl: std::time::Duration::from_secs(600),
protocol: scp_transport::nat::MappingProtocol::UpnpIgd,
}))),
}
}
fn fail(msg: &str) -> Self {
Self {
result: tokio::sync::Mutex::new(Some(Err(
scp_transport::nat::PortMappingError::DiscoveryFailed(msg.to_owned()),
))),
}
}
}
impl scp_transport::nat::PortMapper for MockPortMapper {
fn map_port(
&self,
_internal_port: u16,
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<
scp_transport::nat::PortMappingResult,
scp_transport::nat::PortMappingError,
>,
> + Send
+ '_,
>,
> {
Box::pin(async {
let mut r = self.result.lock().await;
r.take().unwrap_or_else(|| {
Err(scp_transport::nat::PortMappingError::Internal(
"no more results".to_owned(),
))
})
})
}
fn renew(
&self,
_internal_port: u16,
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<
scp_transport::nat::PortMappingResult,
scp_transport::nat::PortMappingError,
>,
> + Send
+ '_,
>,
> {
Box::pin(async {
Err(scp_transport::nat::PortMappingError::Internal(
"renew not expected".to_owned(),
))
})
}
fn remove(
&self,
_internal_port: u16,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<(), scp_transport::nat::PortMappingError>>
+ Send
+ '_,
>,
> {
Box::pin(async { Ok(()) })
}
}
#[tokio::test]
async fn default_nat_strategy_upnp_self_test_failure_falls_through_to_bridge() {
let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind");
let stun_addr = stun.local_addr().expect("addr");
let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
let h = spawn_mock_stun_server(stun, stun_external, 1);
let upnp_external = SocketAddr::from(([198, 51, 100, 1], 8443_u16));
let mapper = Arc::new(MockPortMapper::ok(upnp_external));
let probe = Arc::new(MockReachabilityProbe::new(false));
let strategy = DefaultNatStrategy::new(
Some(stun_addr.to_string()),
Some("wss://bridge.example.com/scp/v1".to_owned()),
)
.with_port_mapper(mapper)
.with_reachability_probe(probe);
let tier = strategy.select_tier(4000).await.expect("should succeed");
assert!(
matches!(tier, ReachabilityTier::Bridge { .. }),
"should fall through to bridge when all self-tests fail, got: {tier:?}"
);
h.await.expect("server");
}
#[tokio::test]
async fn default_nat_strategy_upnp_self_test_success_returns_tier1() {
let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind");
let stun_addr = stun.local_addr().expect("addr");
let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
let h = spawn_mock_stun_server(stun, stun_external, 1);
let upnp_external = SocketAddr::from(([198, 51, 100, 1], 8443_u16));
let mapper = Arc::new(MockPortMapper::ok(upnp_external));
let probe = Arc::new(MockReachabilityProbe::new(true));
let strategy = DefaultNatStrategy::new(Some(stun_addr.to_string()), None)
.with_port_mapper(mapper)
.with_reachability_probe(probe);
let tier = strategy.select_tier(4000).await.expect("should succeed");
match tier {
ReachabilityTier::Upnp { external_addr } => {
assert_eq!(external_addr, upnp_external);
}
other => panic!("expected Tier 1 Upnp, got: {other:?}"),
}
h.await.expect("server");
}
#[tokio::test]
async fn default_nat_strategy_upnp_mapping_failure_falls_through_to_stun() {
let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind");
let stun_addr = stun.local_addr().expect("addr");
let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
let h = spawn_mock_stun_server(stun, stun_external, 1);
let mapper = Arc::new(MockPortMapper::fail("no UPnP gateway"));
let probe = Arc::new(MockReachabilityProbe::new(true));
let strategy = DefaultNatStrategy::new(
Some(stun_addr.to_string()),
Some("wss://bridge.example.com/scp/v1".to_owned()),
)
.with_port_mapper(mapper)
.with_reachability_probe(probe);
let tier = strategy.select_tier(4000).await.expect("should succeed");
match tier {
ReachabilityTier::Stun { external_addr } => {
assert_eq!(external_addr, stun_external);
}
other => panic!("expected Tier 2 Stun, got: {other:?}"),
}
h.await.expect("server");
}
#[tokio::test]
async fn default_nat_strategy_no_port_mapper_skips_tier1() {
let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind");
let stun_addr = stun.local_addr().expect("addr");
let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
let h = spawn_mock_stun_server(stun, stun_external, 1);
let probe = Arc::new(MockReachabilityProbe::new(true));
let strategy = DefaultNatStrategy::new(Some(stun_addr.to_string()), None)
.with_reachability_probe(probe);
let tier = strategy.select_tier(4000).await.expect("should succeed");
match tier {
ReachabilityTier::Stun { external_addr } => {
assert_eq!(external_addr, stun_external);
}
other => panic!("expected Tier 2 Stun, got: {other:?}"),
}
h.await.expect("server");
}
#[tokio::test]
async fn default_nat_strategy_stun_self_test_failure_falls_through_to_bridge() {
let stun = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind");
let stun_addr = stun.local_addr().expect("addr");
let stun_external = SocketAddr::from(([203, 0, 113, 42], 32891_u16));
let h = spawn_mock_stun_server(stun, stun_external, 1);
let probe = Arc::new(MockReachabilityProbe::new(false));
let strategy = DefaultNatStrategy::new(
Some(stun_addr.to_string()),
Some("wss://bridge.example.com/scp/v1".to_owned()),
)
.with_reachability_probe(probe);
let tier = strategy.select_tier(4000).await.expect("should succeed");
match tier {
ReachabilityTier::Bridge { bridge_url } => {
assert_eq!(bridge_url, "wss://bridge.example.com/scp/v1");
}
other => panic!("expected Tier 3 Bridge, got: {other:?}"),
}
h.await.expect("server");
}
mod http_tests {
use super::*;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use scp_core::well_known::WellKnownScp;
use tower::ServiceExt;
async fn build_test_node() -> ApplicationNode<InMemoryStorage> {
test_builder()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap()
}
#[tokio::test]
async fn well_known_returns_valid_json() {
let node = build_test_node().await;
let router = node.well_known_router();
let request = Request::builder()
.uri("/.well-known/scp")
.body(Body::empty())
.unwrap();
let response = router.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let content_type = response
.headers()
.get("content-type")
.expect("should have content-type header")
.to_str()
.unwrap();
assert!(
content_type.contains("application/json"),
"Content-Type should be application/json, got: {content_type}"
);
let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
.await
.unwrap();
let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
assert_eq!(doc.version, 1);
assert!(
doc.did.starts_with("did:dht:"),
"DID should be the node's DID, got: {}",
doc.did
);
assert_eq!(doc.relay, "wss://test.example.com/scp/v1");
assert!(doc.contexts.is_none(), "no contexts registered yet");
}
#[tokio::test]
async fn well_known_includes_registered_broadcast_contexts() {
let node = build_test_node().await;
node.register_broadcast_context("abc123".to_owned(), Some("Test Broadcast".to_owned()))
.await
.unwrap();
let router = node.well_known_router();
let request = Request::builder()
.uri("/.well-known/scp")
.body(Body::empty())
.unwrap();
let response = router.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
.await
.unwrap();
let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
let contexts = doc.contexts.expect("should have contexts");
assert_eq!(contexts.len(), 1);
assert_eq!(contexts[0].id, "abc123");
assert_eq!(contexts[0].name.as_deref(), Some("Test Broadcast"));
assert_eq!(contexts[0].mode.as_deref(), Some("broadcast"));
assert!(
contexts[0]
.uri
.as_ref()
.unwrap()
.starts_with("scp://context/abc123"),
"URI should start with scp://context/abc123, got: {}",
contexts[0].uri.as_ref().unwrap()
);
}
#[tokio::test]
async fn well_known_dynamic_updates_on_new_context() {
let node = build_test_node().await;
let router = node.well_known_router();
let request = Request::builder()
.uri("/.well-known/scp")
.body(Body::empty())
.unwrap();
let response = router.oneshot(request).await.unwrap();
let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
.await
.unwrap();
let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
assert!(doc.contexts.is_none());
node.register_broadcast_context("def456".to_owned(), None)
.await
.unwrap();
let router = node.well_known_router();
let request = Request::builder()
.uri("/.well-known/scp")
.body(Body::empty())
.unwrap();
let response = router.oneshot(request).await.unwrap();
let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
.await
.unwrap();
let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
let contexts = doc.contexts.expect("should now have contexts");
assert_eq!(contexts.len(), 1);
assert_eq!(contexts[0].id, "def456");
}
#[tokio::test]
async fn relay_router_upgrades_websocket() {
let node = build_test_node().await;
let _relay_addr = node.relay().bound_addr();
let relay_router = node.relay_router();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let http_addr = listener.local_addr().unwrap();
let server_handle = tokio::spawn(async move {
axum::serve(listener, relay_router).await.unwrap();
});
let url = format!("ws://{http_addr}/scp/v1");
let connect_result = tokio_tungstenite::connect_async(&url).await;
assert!(
connect_result.is_ok(),
"WebSocket upgrade at /scp/v1 should succeed, got error: {:?}",
connect_result.err()
);
server_handle.abort();
let _ = server_handle.await;
}
#[tokio::test]
async fn custom_app_routes_merge_with_scp_routes() {
let node = build_test_node().await;
let app_router =
axum::Router::new().route("/health", axum::routing::get(|| async { "ok" }));
let well_known = node.well_known_router();
let merged = app_router.merge(well_known);
let request = Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap();
let response = merged.clone().oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
.await
.unwrap();
assert_eq!(&body[..], b"ok");
let request = Request::builder()
.uri("/.well-known/scp")
.body(Body::empty())
.unwrap();
let response = merged.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), 1024 * 1024)
.await
.unwrap();
let doc: WellKnownScp = serde_json::from_slice(&body).unwrap();
assert_eq!(doc.version, 1);
}
}
struct SequenceNatStrategy {
tiers: std::sync::Mutex<Vec<ReachabilityTier>>,
call_count: std::sync::atomic::AtomicU32,
}
impl SequenceNatStrategy {
fn new(tiers: Vec<ReachabilityTier>) -> Self {
Self {
tiers: std::sync::Mutex::new(tiers),
call_count: std::sync::atomic::AtomicU32::new(0),
}
}
}
impl NatStrategy for SequenceNatStrategy {
fn select_tier(
&self,
_relay_port: u16,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
> {
let idx = self
.call_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst) as usize;
let tiers = self.tiers.lock().unwrap();
let tier = tiers[idx % tiers.len()].clone();
drop(tiers);
Box::pin(async move { Ok(tier) })
}
}
struct RecordingPublisher {
publish_count: std::sync::atomic::AtomicU32,
}
impl RecordingPublisher {
fn new() -> Self {
Self {
publish_count: std::sync::atomic::AtomicU32::new(0),
}
}
fn count(&self) -> u32 {
self.publish_count.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl DidPublisher for RecordingPublisher {
fn publish<'a>(
&'a self,
_identity: &'a ScpIdentity,
_document: &'a DidDocument,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), IdentityError>> + Send + 'a>,
> {
self.publish_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Box::pin(async { Ok(()) })
}
}
const TEST_REEVALUATION_INTERVAL: Duration = Duration::from_millis(50);
const TEST_EVENT_TIMEOUT: Duration = Duration::from_secs(5);
#[tokio::test]
async fn tier_change_after_30_minutes_triggers_did_republish() {
let initial_addr = SocketAddr::from(([198, 51, 100, 7], 32891));
let new_addr = SocketAddr::from(([203, 0, 113, 42], 8443));
let strategy = Arc::new(SequenceNatStrategy::new(vec![
ReachabilityTier::Stun {
external_addr: initial_addr,
},
ReachabilityTier::Upnp {
external_addr: new_addr,
},
]));
let publisher = Arc::new(RecordingPublisher::new());
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(16);
let identity = ScpIdentity {
identity_key: scp_platform::KeyHandle::new(1),
active_signing_key: scp_platform::KeyHandle::new(2),
agent_signing_key: None,
pre_rotation_commitment: [0u8; 32],
did: "did:dht:test123".to_owned(),
};
let document = DidDocument {
context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
id: "did:dht:test123".to_owned(),
verification_method: vec![],
authentication: vec![],
assertion_method: vec![],
also_known_as: vec![],
service: vec![scp_identity::document::Service {
id: "did:dht:test123#scp-relay-1".to_owned(),
service_type: "SCPRelay".to_owned(),
service_endpoint: "ws://198.51.100.7:32891/scp/v1".to_owned(),
}],
};
let handle = spawn_tier_reevaluation(
Arc::clone(&strategy) as Arc<dyn NatStrategy>,
None,
Arc::clone(&publisher) as Arc<dyn DidPublisher>,
identity,
document,
32891,
"ws://198.51.100.7:32891/scp/v1".to_owned(),
Some(event_tx),
TEST_REEVALUATION_INTERVAL,
);
let event = tokio::time::timeout(TEST_EVENT_TIMEOUT, event_rx.recv())
.await
.expect("timeout waiting for tier change event")
.expect("channel closed unexpectedly");
match event {
NatTierChange::TierChanged {
previous_relay_url,
new_relay_url,
reason,
} => {
assert_eq!(previous_relay_url, "ws://198.51.100.7:32891/scp/v1");
assert_eq!(new_relay_url, "ws://203.0.113.42:8443/scp/v1");
assert!(
reason.contains("periodic"),
"reason should mention periodic: {reason}"
);
}
other => panic!("expected TierChanged, got {other:?}"),
}
assert_eq!(
publisher.count(),
1,
"DID document should be republished after tier change"
);
handle.stop();
}
#[tokio::test]
async fn network_event_triggers_immediate_reevaluation() {
let new_addr = SocketAddr::from(([10, 0, 0, 1], 9999));
let strategy = Arc::new(SequenceNatStrategy::new(vec![ReachabilityTier::Stun {
external_addr: new_addr,
}]));
let publisher = Arc::new(RecordingPublisher::new());
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(16);
let (net_change_tx, net_change_rx) = tokio::sync::mpsc::channel(16);
let detector = Arc::new(scp_transport::nat::ChannelNetworkChangeDetector::new(
net_change_rx,
));
let identity = ScpIdentity {
identity_key: scp_platform::KeyHandle::new(1),
active_signing_key: scp_platform::KeyHandle::new(2),
agent_signing_key: None,
pre_rotation_commitment: [0u8; 32],
did: "did:dht:testnet123".to_owned(),
};
let document = DidDocument {
context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
id: "did:dht:testnet123".to_owned(),
verification_method: vec![],
authentication: vec![],
assertion_method: vec![],
also_known_as: vec![],
service: vec![scp_identity::document::Service {
id: "did:dht:testnet123#scp-relay-1".to_owned(),
service_type: "SCPRelay".to_owned(),
service_endpoint: "ws://198.51.100.7:32891/scp/v1".to_owned(),
}],
};
let handle = spawn_tier_reevaluation(
Arc::clone(&strategy) as Arc<dyn NatStrategy>,
Some(detector as Arc<dyn NetworkChangeDetector>),
Arc::clone(&publisher) as Arc<dyn DidPublisher>,
identity,
document,
32891,
"ws://198.51.100.7:32891/scp/v1".to_owned(),
Some(event_tx),
Duration::from_secs(60 * 60),
);
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(10)).await;
net_change_tx.send(()).await.expect("send network change");
let event = tokio::time::timeout(TEST_EVENT_TIMEOUT, event_rx.recv())
.await
.expect("timeout waiting for tier change event")
.expect("channel closed unexpectedly");
match event {
NatTierChange::TierChanged {
previous_relay_url,
new_relay_url,
reason,
} => {
assert_eq!(previous_relay_url, "ws://198.51.100.7:32891/scp/v1");
assert_eq!(new_relay_url, "ws://10.0.0.1:9999/scp/v1");
assert!(
reason.contains("network change"),
"reason should mention network change: {reason}"
);
}
other => panic!("expected TierChanged, got {other:?}"),
}
assert_eq!(
publisher.count(),
1,
"DID document should be republished after network change"
);
handle.stop();
}
#[tokio::test]
async fn no_event_when_tier_unchanged_after_reevaluation() {
let addr = SocketAddr::from(([198, 51, 100, 7], 32891));
let strategy = Arc::new(SequenceNatStrategy::new(vec![ReachabilityTier::Stun {
external_addr: addr,
}]));
let publisher = Arc::new(RecordingPublisher::new());
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(16);
let identity = ScpIdentity {
identity_key: scp_platform::KeyHandle::new(1),
active_signing_key: scp_platform::KeyHandle::new(2),
agent_signing_key: None,
pre_rotation_commitment: [0u8; 32],
did: "did:dht:unchanged123".to_owned(),
};
let document = DidDocument {
context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
id: "did:dht:unchanged123".to_owned(),
verification_method: vec![],
authentication: vec![],
assertion_method: vec![],
also_known_as: vec![],
service: vec![scp_identity::document::Service {
id: "did:dht:unchanged123#scp-relay-1".to_owned(),
service_type: "SCPRelay".to_owned(),
service_endpoint: "ws://198.51.100.7:32891/scp/v1".to_owned(),
}],
};
let handle = spawn_tier_reevaluation(
Arc::clone(&strategy) as Arc<dyn NatStrategy>,
None,
Arc::clone(&publisher) as Arc<dyn DidPublisher>,
identity,
document,
32891,
"ws://198.51.100.7:32891/scp/v1".to_owned(),
Some(event_tx),
TEST_REEVALUATION_INTERVAL,
);
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(
publisher.count(),
0,
"DID document should NOT be republished when tier is unchanged"
);
let recv_result = event_rx.try_recv();
assert!(
recv_result.is_err(),
"no TierChanged event should be emitted when tier is unchanged"
);
handle.stop();
}
struct FailThenSucceedStrategy {
call_count: std::sync::atomic::AtomicU32,
success_tier: ReachabilityTier,
}
impl NatStrategy for FailThenSucceedStrategy {
fn select_tier(
&self,
_relay_port: u16,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<ReachabilityTier, NodeError>> + Send + '_>,
> {
let n = self
.call_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let tier = self.success_tier.clone();
Box::pin(async move {
if n == 0 {
Err(NodeError::Nat("transient STUN failure".into()))
} else {
Ok(tier)
}
})
}
}
#[tokio::test]
async fn reevaluation_loop_survives_nat_probe_failure() {
let addr = SocketAddr::from(([198, 51, 100, 7], 32891));
let new_addr = SocketAddr::from(([10, 0, 0, 1], 5000));
let strategy = Arc::new(FailThenSucceedStrategy {
call_count: std::sync::atomic::AtomicU32::new(0),
success_tier: ReachabilityTier::Stun {
external_addr: new_addr,
},
});
let publisher = Arc::new(RecordingPublisher::new());
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(16);
let identity = ScpIdentity {
identity_key: scp_platform::KeyHandle::new(1),
active_signing_key: scp_platform::KeyHandle::new(2),
agent_signing_key: None,
pre_rotation_commitment: [0u8; 32],
did: "did:dht:resilient123".to_owned(),
};
let document = DidDocument {
context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
id: "did:dht:resilient123".to_owned(),
verification_method: vec![],
authentication: vec![],
assertion_method: vec![],
also_known_as: vec![],
service: vec![scp_identity::document::Service {
id: "did:dht:resilient123#scp-relay-1".to_owned(),
service_type: "SCPRelay".to_owned(),
service_endpoint: format!("ws://{addr}/scp/v1"),
}],
};
let handle = spawn_tier_reevaluation(
strategy as Arc<dyn NatStrategy>,
None,
Arc::clone(&publisher) as Arc<dyn DidPublisher>,
identity,
document,
addr.port(),
format!("ws://{addr}/scp/v1"),
Some(event_tx),
TEST_REEVALUATION_INTERVAL,
);
let event = tokio::time::timeout(TEST_EVENT_TIMEOUT, event_rx.recv())
.await
.expect("timeout waiting for tier change event after recovery")
.expect("channel closed unexpectedly");
assert!(matches!(event, NatTierChange::TierChanged { .. }));
assert_eq!(
publisher.count(),
1,
"republish after successful re-evaluation"
);
handle.stop();
}
#[tokio::test]
async fn no_domain_build_spawns_tier_reevaluation_task() {
let tier = ReachabilityTier::Stun {
external_addr: SocketAddr::from(([198, 51, 100, 7], 32891)),
};
let node = test_no_domain_builder(tier)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.tier_reeval.is_some(),
"no-domain mode should spawn the tier re-evaluation task"
);
assert!(
node.tier_change_rx.is_some(),
"no-domain mode should provide a tier change event channel"
);
node.shutdown();
}
#[tokio::test]
async fn domain_build_does_not_spawn_tier_reevaluation_task() {
let node = test_builder()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(
node.tier_reeval.is_none(),
"domain mode should NOT spawn the tier re-evaluation task"
);
assert!(
node.tier_change_rx.is_none(),
"domain mode should NOT provide a tier change event channel"
);
node.shutdown();
}
#[tokio::test]
async fn identity_with_storage_creates_and_persists_on_first_run() {
let storage = Arc::new(InMemoryStorage::new());
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let node = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.domain("persist.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "persist.example.com".to_owned(),
}))
.identity_with_storage(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
let did = node.identity().did().to_owned();
assert!(
did.starts_with("did:dht:"),
"DID should start with did:dht:"
);
let stored = storage
.retrieve(IDENTITY_STORAGE_KEY)
.await
.unwrap()
.expect("identity should be persisted to storage");
let envelope: StoredValue<PersistedIdentity> = rmp_serde::from_slice(&stored).unwrap();
assert_eq!(envelope.version, CURRENT_STORE_VERSION);
assert_eq!(envelope.data.identity.did, did);
assert_eq!(envelope.data.document.id, did);
node.shutdown();
}
#[tokio::test]
async fn identity_with_storage_reloads_on_subsequent_run() {
let storage = Arc::new(InMemoryStorage::new());
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let node1 = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.domain("reload.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "reload.example.com".to_owned(),
}))
.identity_with_storage(Arc::clone(&custody), Arc::clone(&did_method))
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
let first_did = node1.identity().did().to_owned();
node1.shutdown();
let node2 = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.domain("reload.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "reload.example.com".to_owned(),
}))
.identity_with_storage(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert_eq!(
node2.identity().did(),
first_did,
"second run should produce the same DID"
);
node2.shutdown();
}
#[tokio::test]
async fn identity_with_storage_rejects_mismatched_custody() {
let storage = Arc::new(InMemoryStorage::new());
let custody1 = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody1));
let node1 = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.domain("mismatch.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "mismatch.example.com".to_owned(),
}))
.identity_with_storage(custody1, Arc::clone(&did_method))
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
node1.shutdown();
let custody2 = Arc::new(InMemoryKeyCustody::new());
let did_method2 = Arc::new(make_test_dht(&custody2));
let result = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.domain("mismatch.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "mismatch.example.com".to_owned(),
}))
.identity_with_storage(custody2, did_method2)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await;
let err = result
.err()
.expect("build should fail with mismatched custody");
let msg = err.to_string();
assert!(
msg.contains("not found in custody"),
"expected custody validation error, got: {msg}"
);
}
#[tokio::test]
async fn identity_with_storage_stored_value_envelope_roundtrip() {
use scp_platform::traits::KeyHandle;
let persisted = PersistedIdentity {
identity: ScpIdentity {
identity_key: KeyHandle::new(1),
active_signing_key: KeyHandle::new(2),
agent_signing_key: None,
pre_rotation_commitment: [0u8; 32],
did: "did:dht:zroundtrip".to_owned(),
},
document: DidDocument {
context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
id: "did:dht:zroundtrip".to_owned(),
verification_method: vec![],
authentication: vec![],
assertion_method: vec![],
also_known_as: vec![],
service: vec![],
},
};
let envelope = StoredValue {
version: CURRENT_STORE_VERSION,
data: &persisted,
};
let bytes = rmp_serde::to_vec_named(&envelope).unwrap();
let decoded: StoredValue<PersistedIdentity> = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(decoded.version, CURRENT_STORE_VERSION);
assert_eq!(decoded.data.identity.did, "did:dht:zroundtrip");
assert_eq!(decoded.data.document.id, "did:dht:zroundtrip");
}
#[tokio::test]
async fn identity_with_storage_rejects_future_version() {
use scp_platform::traits::KeyHandle;
let persisted = PersistedIdentity {
identity: ScpIdentity {
identity_key: KeyHandle::new(1),
active_signing_key: KeyHandle::new(2),
agent_signing_key: None,
pre_rotation_commitment: [0u8; 32],
did: "did:dht:zfuture".to_owned(),
},
document: DidDocument {
context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
id: "did:dht:zfuture".to_owned(),
verification_method: vec![],
authentication: vec![],
assertion_method: vec![],
also_known_as: vec![],
service: vec![],
},
};
let future_version = CURRENT_STORE_VERSION + 1;
let envelope = StoredValue {
version: future_version,
data: &persisted,
};
let bytes = rmp_serde::to_vec_named(&envelope).unwrap();
let storage = Arc::new(InMemoryStorage::new());
storage.store(IDENTITY_STORAGE_KEY, &bytes).await.unwrap();
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let result = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.domain("future-ver.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "future-ver.example.com".to_owned(),
}))
.identity_with_storage(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await;
match result {
Err(err) => {
let msg = err.to_string();
assert!(
msg.contains("newer than supported version"),
"expected future version rejection error, got: {msg}"
);
}
Ok(node) => {
node.shutdown();
panic!("expected future version rejection, but build succeeded");
}
}
}
#[tokio::test]
async fn generate_identity_with_does_not_persist() {
let storage = Arc::new(InMemoryStorage::new());
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let node = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.domain("nopersist.example.com")
.tls_provider(Arc::new(SucceedingTlsProvider {
domain: "nopersist.example.com".to_owned(),
}))
.generate_identity_with(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert!(node.identity().did().starts_with("did:dht:"));
let stored = storage.retrieve(IDENTITY_STORAGE_KEY).await.unwrap();
assert!(
stored.is_none(),
"generate_identity_with should NOT persist identity"
);
node.shutdown();
}
#[tokio::test]
async fn identity_with_storage_no_domain_mode() {
let storage = Arc::new(InMemoryStorage::new());
let custody = Arc::new(InMemoryKeyCustody::new());
let did_method = Arc::new(make_test_dht(&custody));
let tier = ReachabilityTier::Upnp {
external_addr: SocketAddr::from(([1, 2, 3, 4], 9090)),
};
let node = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.no_domain()
.nat_strategy(Arc::new(MockNatStrategy { tier: tier.clone() }))
.identity_with_storage(Arc::clone(&custody), Arc::clone(&did_method))
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
let first_did = node.identity().did().to_owned();
node.shutdown();
let node2 = ApplicationNodeBuilder::new()
.storage(Arc::clone(&storage))
.no_domain()
.nat_strategy(Arc::new(MockNatStrategy { tier }))
.identity_with_storage(custody, did_method)
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 0)))
.build_for_testing()
.await
.unwrap();
assert_eq!(
node2.identity().did(),
first_did,
"no-domain mode should also reload persisted identity"
);
node2.shutdown();
}
}