Skip to main content

bairelay_wake_server/
lib.rs

1//! Local replacement for Reolink's P2P cloud servers — wakes battery cameras over the LAN.
2//!
3//! Wire-level reverse-engineering and operator-facing details live in
4//! `docs/cloud-interception.md` § Part I.
5
6pub mod config;
7pub mod packet;
8pub mod registry;
9pub mod route;
10
11mod middleman;
12mod register;
13
14use std::io;
15use std::net::SocketAddr;
16
17pub use config::WakeServerConfig;
18
19/// Public error type at the library boundary.
20#[derive(Debug, thiserror::Error)]
21pub enum WakeServerError {
22	/// Surfaces port conflicts at startup so the operator sees which port is taken.
23	#[error("wake server failed to bind to {addr}: {source}")]
24	Bind { addr: SocketAddr, source: io::Error },
25
26	/// Wraps unrecoverable post-bind socket I/O so callers can distinguish it from framing errors.
27	#[error("wake server socket error: {0}")]
28	Socket(#[from] io::Error),
29
30	/// Signals corrupt or truncated BcUdp framing so we can drop the packet instead of crashing.
31	#[error("BcUdp framing error: {0}")]
32	Frame(#[from] bairelay_neolink_core::Error),
33
34	/// Signals XML payloads that don't match the BcUdp schema so malformed peers never panic the loop.
35	#[error("BcUdp XML error: {0}")]
36	Xml(#[from] quick_xml::de::DeError),
37
38	/// Carries config-validation and join-error messages without leaking internal types.
39	#[error("invalid wake-server config: {0}")]
40	Config(String),
41
42	/// Flags BcUdp packet kinds we don't expect on a given listener so we can log and drop.
43	#[error("wake server received unexpected BcUdp packet kind: {kind}")]
44	UnexpectedPacketKind { kind: &'static str },
45}
46
47use std::sync::Arc;
48use tokio::net::UdpSocket;
49use tokio_util::sync::CancellationToken;
50
51/// Build a fresh `Arc<CameraRegistry>` for the binary to share between
52/// the wake server and downstream consumers.
53/// Tests can call this too — keeps registry construction in one place.
54pub fn make_registry() -> Arc<registry::CameraRegistry> {
55	Arc::new(registry::CameraRegistry::new())
56}
57
58/// Internal entrypoint used by both production `run` and integration tests.
59/// Tests bind sockets to ephemeral ports first; production binds based on
60/// `RuntimeConfig`. Returns `Ok(())` on graceful cancellation.
61///
62/// `registry` is supplied by the caller so other tasks (e.g. the
63/// push-listener that reads source IPs out of the same map) can share state.
64pub async fn run_with_sockets(
65	cfg: config::RuntimeConfig,
66	registry: Arc<registry::CameraRegistry>,
67	middleman_sock: UdpSocket,
68	register_sock: UdpSocket,
69	cancel: CancellationToken,
70) -> Result<(), WakeServerError> {
71	let middleman_sock = Arc::new(middleman_sock);
72	let register_sock = Arc::new(register_sock);
73
74	// Per-UID `(token, ac)` map shared between the middleman (which
75	// issues anchors during `M2D_Q_R`) and the register loop (which
76	// echoes the `ac` back in `R2D_R_R`).
77	let anchors = Arc::new(registry::SessionAnchors::new());
78
79	let register_local = register_sock
80		.local_addr()
81		.map_err(WakeServerError::Socket)?;
82
83	// Spawn middleman + register loops; race against cancellation. The
84	// first listener that exits (success or error) takes the result; the
85	// other is cancelled by sharing the same token.
86	let mid = {
87		let sock = Arc::clone(&middleman_sock);
88		let cancel = cancel.clone();
89		let bind = cfg.bind;
90		let anchors = Arc::clone(&anchors);
91		tokio::spawn(
92			async move { middleman::run(sock, register_local, bind, anchors, cancel).await },
93		)
94	};
95	let reg = {
96		let sock = Arc::clone(&register_sock);
97		let cancel = cancel.clone();
98		let registry = Arc::clone(&registry);
99		let anchors = Arc::clone(&anchors);
100		let cfg = cfg.clone();
101		tokio::spawn(async move { register::run(sock, registry, anchors, cfg, cancel).await })
102	};
103
104	let res: Result<(), WakeServerError> = tokio::select! {
105		r = mid => match r {
106			Ok(inner) => inner,
107			Err(join_err) => Err(WakeServerError::Config(format!("middleman join: {join_err}"))),
108		},
109		r = reg => match r {
110			Ok(inner) => inner,
111			Err(join_err) => Err(WakeServerError::Config(format!("register join: {join_err}"))),
112		},
113	};
114	cancel.cancel();
115	res
116}
117
118/// Production entrypoint: binds the configured sockets and runs the
119/// listener pair until the cancellation token fires. The caller supplies
120/// the `Arc<CameraRegistry>` so it can be shared with the push-listener
121/// and any future readers.
122pub async fn run(
123	cfg: config::RuntimeConfig,
124	registry: Arc<registry::CameraRegistry>,
125	cancel: CancellationToken,
126) -> Result<(), WakeServerError> {
127	let middleman_addr = std::net::SocketAddr::new(cfg.bind, cfg.middleman_port);
128	let register_addr = std::net::SocketAddr::new(cfg.bind, cfg.register_port);
129	let middleman =
130		UdpSocket::bind(middleman_addr)
131			.await
132			.map_err(|source| WakeServerError::Bind {
133				addr: middleman_addr,
134				source,
135			})?;
136	let register =
137		UdpSocket::bind(register_addr)
138			.await
139			.map_err(|source| WakeServerError::Bind {
140				addr: register_addr,
141				source,
142			})?;
143	run_with_sockets(cfg, registry, middleman, register, cancel).await
144}
145
146#[cfg(test)]
147mod error_tests {
148	use super::*;
149	use std::net::Ipv4Addr;
150
151	#[test]
152	fn bind_error_displays_addr_and_source() {
153		let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9999);
154		let source = io::Error::new(io::ErrorKind::AddrInUse, "in use");
155		let err = WakeServerError::Bind { addr, source };
156		let s = format!("{err}");
157		assert!(s.contains("9999"));
158		assert!(s.contains("in use"));
159	}
160}