Skip to main content

bestool_canopy/
client.rs

1use std::{
2	fmt,
3	io::Write,
4	net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
5	sync::{Arc, OnceLock},
6	time::Duration,
7};
8
9use flate2::{Compression, write::GzEncoder};
10use hickory_resolver::{
11	ConnectionProvider, Resolver,
12	config::{ConnectionConfig, NameServerConfig, ResolverConfig},
13	net::runtime::TokioRuntimeProvider,
14};
15use miette::{IntoDiagnostic, Result, WrapErr};
16use rcgen::{CertificateParams, DistinguishedName, DnType, KeyPair};
17use reqwest::Url;
18use time::{Duration as TimeDuration, OffsetDateTime};
19use tokio::sync::RwLock;
20use tracing::debug;
21use uuid::Uuid;
22
23use crate::{
24	Redacted,
25	backup::TargetOutcome,
26	restore::{RestoreCapabilitiesRequest, RestoreCredentialsRequest},
27	schema::{
28		BackupPurpose, BackupTarget, CapabilitiesArgs, CredentialProcessOutput, CredentialsArgs,
29		NewEvent, ReportArgs, RestoreCredentials, VerificationArgs, WorklistEntry,
30	},
31};
32
33pub const DEFAULT_CANOPY_URL: &str = "https://meta.tamanu.app";
34
35/// Base URL for the tailscale-internal canopy endpoint.
36///
37/// On hosts that share the canopy tailnet, posting to this URL works without
38/// mTLS — the tailscale identity is the auth.
39pub const TAILSCALE_URL: &str = "https://canopy.tail53aef.ts.net";
40
41/// Bare hostname used for `resolve_to_addrs` overrides.
42const TAILSCALE_HOST: &str = "canopy.tail53aef.ts.net";
43
44/// Hardcoded tailscale IPs for canopy, used when tailscale DNS
45/// (100.100.100.100) is unreachable but the tailnet otherwise is.
46const CANOPY_HARDCODED_V4: Ipv4Addr = Ipv4Addr::new(100, 99, 98, 97);
47const CANOPY_HARDCODED_V6: Ipv6Addr =
48	Ipv6Addr::new(0xfd7a, 0x115c, 0xa1e0, 0, 0, 0, 0x9337, 0xfb52);
49
50/// How long renewed canopy certs are valid for.
51///
52/// Set well above [`CERT_RENEW_AFTER`] so a renewal failure doesn't immediately
53/// strand the client.
54const CERT_VALIDITY_DAYS: i64 = 6;
55
56/// How long to wait between scheduled cert renewals.
57///
58/// Renewal runs in a background task in the daemon; the legacy single-shot
59/// alerts command builds the client once and exits well within this window.
60pub const CERT_RENEW_AFTER: Duration = Duration::from_secs(5 * 24 * 60 * 60);
61
62/// Timeout for the tailscale availability probe.
63const TAILSCALE_PROBE_TIMEOUT: Duration = Duration::from_secs(5);
64
65/// Factory producing the base [`reqwest::ClientBuilder`] for canopy's clients.
66///
67/// The caller supplies this so it owns cross-cutting client config (user-agent,
68/// `SSLKEYLOGFILE`, proxies, …). Canopy invokes it whenever it needs to build or
69/// rebuild a client — at probe time, on mTLS cert renewal, and on reload — then
70/// layers its own concerns (mTLS identity, DNS overrides, timeouts) on top.
71pub type ClientBuilderFactory = Arc<dyn Fn() -> reqwest::ClientBuilder + Send + Sync>;
72
73/// Browser-style user-agent string, e.g. `bestool/1.2.3 (Linux 7.0.9 Arch Linux; x86_64)`.
74///
75/// `product` and `version` identify the calling binary; the OS comment is
76/// detected at runtime and cached.
77pub fn user_agent(product: &str, version: &str) -> String {
78	static OS_COMMENT: OnceLock<String> = OnceLock::new();
79	let os_comment = OS_COMMENT.get_or_init(|| {
80		let os = sysinfo::System::long_os_version()
81			.or_else(sysinfo::System::name)
82			.unwrap_or_else(|| std::env::consts::OS.to_owned());
83		format!("{os}; {}", sysinfo::System::cpu_arch())
84	});
85	format!("{product}/{version} ({os_comment})")
86}
87
88/// A [`reqwest::ClientBuilder`] carrying the `bestool` [`user_agent`] for `version`.
89///
90/// Convenience for callers that don't need any extra client config; suitable as
91/// the base of a [`ClientBuilderFactory`].
92pub fn client_builder(version: &str) -> reqwest::ClientBuilder {
93	reqwest::Client::builder().user_agent(user_agent("bestool", version))
94}
95
96/// Probe the canopy tailnet endpoint, returning a client routed to it if
97/// reachable.
98///
99/// The returned client carries the same DNS / hardcoded-IP resolution override
100/// the reporting client uses and presents **no** client certificate — callers
101/// reaching canopy this way authenticate by tailnet identity. Returns `None`
102/// when the tailnet endpoint isn't reachable, so callers can fall back to
103/// public mTLS.
104pub async fn tailscale_client(make_builder: &ClientBuilderFactory) -> Option<reqwest::Client> {
105	probe_tailscale(make_builder).await
106}
107
108/// HTTP client with auth configured for talking to a canopy server.
109///
110/// Tries two auth paths in order of preference:
111/// 1. **Tailscale**: if the canopy tailnet endpoint is reachable, plain HTTPS
112///    works (auth is implicit via tailscale identity).
113/// 2. **mTLS**: a fresh self-signed cert from the device key, short-lived
114///    ([`CERT_VALIDITY_DAYS`]); for long-running daemons, [`Self::renew`]
115///    should tick on [`CERT_RENEW_AFTER`] to swap in a fresh cert before expiry.
116///
117/// [`Self::refresh`] re-probes tailscale and swaps modes on reload.
118pub struct CanopyClient {
119	device_key: Option<Redacted<String>>,
120	/// Tamanu version of the install this client speaks for. Sent verbatim in
121	/// the `X-Version` request header — canopy rejects events / status pushes
122	/// that don't carry one. Sourced from the running Tamanu install's
123	/// `package.json` (via `find_tamanu`); not the bestool / alertd version.
124	tamanu_version: String,
125	/// Produces the base client builder; see [`ClientBuilderFactory`].
126	make_builder: ClientBuilderFactory,
127	state: RwLock<State>,
128}
129
130enum State {
131	Tailscale(reqwest::Client),
132	Mtls(reqwest::Client),
133}
134
135impl State {
136	fn is_tailscale(&self) -> bool {
137		matches!(self, State::Tailscale(_))
138	}
139
140	fn http(&self) -> reqwest::Client {
141		match self {
142			State::Tailscale(http) | State::Mtls(http) => http.clone(),
143		}
144	}
145}
146
147impl fmt::Debug for CanopyClient {
148	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149		f.debug_struct("CanopyClient").finish_non_exhaustive()
150	}
151}
152
153impl CanopyClient {
154	/// Build a canopy client, preferring tailscale and falling back to mTLS.
155	///
156	/// Probes the tailscale canopy endpoint first; if reachable, uses it.
157	/// Otherwise, if a device key PEM is provided, builds an mTLS client.
158	/// Returns `Ok(None)` if neither path is available.
159	///
160	/// `tamanu_version` is the version of the Tamanu install this client
161	/// speaks for; sent on every request via the `X-Version` header.
162	///
163	/// `make_builder` supplies the base [`reqwest::ClientBuilder`] — see
164	/// [`ClientBuilderFactory`]. Use [`client_builder`] for a sensible default.
165	pub async fn new(
166		tamanu_version: impl Into<String>,
167		device_key_pem: Option<&str>,
168		make_builder: impl Fn() -> reqwest::ClientBuilder + Send + Sync + 'static,
169	) -> Result<Option<Self>> {
170		let tamanu_version = tamanu_version.into();
171		let device_key = device_key_pem.map(|s| Redacted(s.to_owned()));
172		let make_builder: ClientBuilderFactory = Arc::new(make_builder);
173
174		if let Some(http) = probe_tailscale(&make_builder).await {
175			debug!("canopy: tailscale endpoint reachable, preferring it");
176			return Ok(Some(Self {
177				device_key,
178				tamanu_version,
179				make_builder,
180				state: RwLock::new(State::Tailscale(http)),
181			}));
182		}
183
184		if let Some(pem) = device_key_pem {
185			debug!("canopy: tailscale unreachable, falling back to mTLS");
186			let http = build_mtls_http(&make_builder, pem)?;
187			return Ok(Some(Self {
188				device_key,
189				tamanu_version,
190				make_builder,
191				state: RwLock::new(State::Mtls(http)),
192			}));
193		}
194
195		Ok(None)
196	}
197
198	/// Returns true if the client is currently using the tailscale path.
199	pub async fn is_tailscale(&self) -> bool {
200		self.state.read().await.is_tailscale()
201	}
202
203	/// Re-probe tailscale and swap modes if the picture has changed.
204	///
205	/// Intended to be called when the daemon receives a reload signal.
206	pub async fn refresh(&self) -> Result<()> {
207		if let Some(http) = probe_tailscale(&self.make_builder).await {
208			let mut state = self.state.write().await;
209			if !state.is_tailscale() {
210				debug!("canopy refresh: switching to tailscale path");
211			}
212			*state = State::Tailscale(http);
213			return Ok(());
214		}
215
216		if let Some(pem) = &self.device_key {
217			let http = build_mtls_http(&self.make_builder, &pem.0)?;
218			let mut state = self.state.write().await;
219			if state.is_tailscale() {
220				debug!("canopy refresh: tailscale dropped, falling back to mTLS");
221			}
222			*state = State::Mtls(http);
223			return Ok(());
224		}
225
226		debug!("canopy refresh: no auth path available, keeping current state");
227		Ok(())
228	}
229
230	/// Rebuild the underlying HTTP client with a fresh certificate.
231	///
232	/// No-op in tailscale mode (no cert to rotate). In mTLS mode, atomically
233	/// replaces the live client; in-flight requests continue with the old
234	/// client until they complete.
235	pub async fn renew(&self) -> Result<()> {
236		let Some(pem) = &self.device_key else {
237			return Ok(());
238		};
239		let mut state = self.state.write().await;
240		if state.is_tailscale() {
241			return Ok(());
242		}
243		*state = State::Mtls(build_mtls_http(&self.make_builder, &pem.0)?);
244		Ok(())
245	}
246
247	/// POST a status snapshot to the canopy server.
248	///
249	/// In tailscale mode, `base_url` is ignored and a `{TAILSCALE_URL}/public/status/{server_id}`
250	/// URL is used. In mTLS mode, posts to `{base_url}/status/{server_id}`.
251	///
252	/// The payload is free-form JSON; the canopy `/status` contract reserves the
253	/// top-level `health: []` key, whose entries each carry a `result` of
254	/// `passed | warning | failed | broken | skipped`. The body is gzip-encoded
255	/// with `Content-Encoding: gzip`.
256	///
257	/// Returns `backup_now`: the backup-type names canopy says this server should
258	/// back up right now (operator one-offs + schedule-due). Empty means nothing
259	/// to do. A response that predates the field (no `backup_now`) yields an empty
260	/// list, so older canopy deployments keep working.
261	pub async fn post_status(
262		&self,
263		base_url: &Url,
264		server_id: &str,
265		payload: &serde_json::Value,
266	) -> Result<Vec<String>> {
267		let (http, url) = {
268			let state = self.state.read().await;
269			let url = match &*state {
270				State::Tailscale(_) => format!("{TAILSCALE_URL}/public/status/{server_id}")
271					.parse::<Url>()
272					.into_diagnostic()
273					.wrap_err("building tailscale /public/status URL")?,
274				State::Mtls(_) => base_url
275					.join(&format!("/status/{server_id}"))
276					.into_diagnostic()
277					.wrap_err("building /status URL")?,
278			};
279			(state.http(), url)
280		};
281
282		let raw = serde_json::to_vec(payload)
283			.into_diagnostic()
284			.wrap_err("serialising canopy /status payload")?;
285		let compressed = gzip_bytes(&raw)
286			.into_diagnostic()
287			.wrap_err("gzipping canopy /status payload")?;
288
289		debug!(
290			%url,
291			raw_bytes = raw.len(),
292			gzip_bytes = compressed.len(),
293			"posting status snapshot to canopy",
294		);
295
296		let response = http
297			.post(url)
298			.header("X-Version", &self.tamanu_version)
299			.header(reqwest::header::CONTENT_TYPE, "application/json")
300			.header(reqwest::header::CONTENT_ENCODING, "gzip")
301			.body(compressed)
302			.send()
303			.await
304			.into_diagnostic()
305			.wrap_err("posting status to canopy")?;
306
307		let status = response.status();
308		if !status.is_success() {
309			let body = response.text().await.unwrap_or_default();
310			return Err(miette::miette!("canopy /status returned {status}: {body}"));
311		}
312
313		#[derive(serde::Deserialize, Default)]
314		struct StatusResponseTail {
315			#[serde(default)]
316			backup_now: Vec<String>,
317		}
318
319		// The response flattens the persisted Status plus `backup_now`; we read
320		// only the latter and ignore the rest. A body that fails to parse (or
321		// predates the field) is treated as "nothing to do" rather than failing
322		// the status push.
323		let tail = response
324			.json::<StatusResponseTail>()
325			.await
326			.unwrap_or_default();
327		Ok(tail.backup_now)
328	}
329
330	/// GET a path on the canopy server, routed via tailscale when available.
331	///
332	/// In tailscale mode, the request goes to `{TAILSCALE_URL}{tailscale_path}`
333	/// (typically `/public/...`, the only mount that accepts tagged-device
334	/// tailscale callers). In mTLS mode, the request goes to `{base_url}{mtls_path}`.
335	///
336	/// Returns the raw response — the caller is responsible for status checks
337	/// and body parsing so they can choose how to fall back if the response
338	/// isn't usable.
339	pub async fn get(
340		&self,
341		base_url: &Url,
342		tailscale_path: &str,
343		mtls_path: &str,
344	) -> Result<reqwest::Response> {
345		let (http, url) = {
346			let state = self.state.read().await;
347			let url = match &*state {
348				State::Tailscale(_) => format!("{TAILSCALE_URL}{tailscale_path}")
349					.parse::<Url>()
350					.into_diagnostic()
351					.wrap_err("building tailscale GET URL")?,
352				State::Mtls(_) => base_url
353					.join(mtls_path)
354					.into_diagnostic()
355					.wrap_err("building mTLS GET URL")?,
356			};
357			(state.http(), url)
358		};
359
360		debug!(%url, "GET via canopy");
361		http.get(url)
362			.header("X-Version", &self.tamanu_version)
363			.send()
364			.await
365			.into_diagnostic()
366			.wrap_err("GET via canopy")
367	}
368
369	/// POST an event to the canopy server.
370	///
371	/// In tailscale mode, `base_url` is ignored and [`TAILSCALE_URL`] is used.
372	/// In mTLS mode, posts to `{base_url}/events`.
373	pub async fn post_event(&self, base_url: &Url, event: NewEvent) -> Result<()> {
374		let (http, url) = {
375			let state = self.state.read().await;
376			let url = match &*state {
377				State::Tailscale(_) => format!("{TAILSCALE_URL}/public/events")
378					.parse::<Url>()
379					.into_diagnostic()
380					.wrap_err("building tailscale /public/events URL")?,
381				State::Mtls(_) => base_url
382					.join("/events")
383					.into_diagnostic()
384					.wrap_err("building /events URL")?,
385			};
386			(state.http(), url)
387		};
388
389		debug!(
390			%url,
391			source = event.source,
392			r#ref = event.ref_,
393			active = ?event.active,
394			"posting event to canopy"
395		);
396
397		let response = http
398			.post(url)
399			.header("X-Version", &self.tamanu_version)
400			.json(&event)
401			.send()
402			.await
403			.into_diagnostic()
404			.wrap_err("posting event to canopy")?;
405
406		let status = response.status();
407		if !status.is_success() {
408			let body = response.text().await.unwrap_or_default();
409			return Err(miette::miette!("canopy /events returned {status}: {body}"));
410		}
411
412		Ok(())
413	}
414
415	/// Resolve an endpoint URL for the current auth path.
416	///
417	/// `path` is the mTLS-mode path (e.g. `/backup-target`); over tailscale the
418	/// same endpoint is mounted under `/public`, so this prepends it.
419	async fn endpoint_url(&self, base_url: &Url, path: &str) -> Result<(reqwest::Client, Url)> {
420		let state = self.state.read().await;
421		let url = match &*state {
422			State::Tailscale(_) => format!("{TAILSCALE_URL}/public{path}")
423				.parse::<Url>()
424				.into_diagnostic()
425				.wrap_err_with(|| format!("building tailscale /public{path} URL"))?,
426			State::Mtls(_) => base_url
427				.join(path)
428				.into_diagnostic()
429				.wrap_err_with(|| format!("building {path} URL"))?,
430		};
431		Ok((state.http(), url))
432	}
433
434	/// Start a request to an arbitrary canopy endpoint on the current auth path.
435	///
436	/// This is the generic escape hatch behind the typed endpoint methods: it
437	/// resolves the right HTTP client and URL for the active auth mode, begins
438	/// the request, and sets the `X-Version` header. The returned builder is
439	/// yours to finish — add query params, a body, or extra headers, then
440	/// `.send()` and parse the response however suits.
441	///
442	/// `path` is the mTLS-mode path (e.g. `/backup-target`); over tailscale the
443	/// same endpoint is mounted under `/public`, so this routes it there, the
444	/// same convention the other endpoint methods follow.
445	pub async fn request(
446		&self,
447		method: reqwest::Method,
448		base_url: &Url,
449		path: &str,
450	) -> Result<reqwest::RequestBuilder> {
451		let (http, url) = self.endpoint_url(base_url, path).await?;
452		debug!(%url, %method, "arbitrary canopy request");
453		Ok(http
454			.request(method, url)
455			.header("X-Version", &self.tamanu_version))
456	}
457
458	/// Call an arbitrary canopy endpoint and parse its JSON response.
459	///
460	/// Builds the request via [`Self::request`], attaches `body` as JSON when
461	/// it's `Some`, sends it, and on a 2xx response parses the body into `Res`.
462	/// A non-success status becomes an error carrying the status and response
463	/// body, matching the other endpoint methods. This absorbs the status-check
464	/// and parse boilerplate.
465	///
466	/// Use [`serde_json::Value`] for `Res` (and/or the body) for fully dynamic
467	/// calls, or any concrete type for typed calls. When passing no body, pin
468	/// the inference with a turbofish, e.g. `None::<&()>`.
469	///
470	/// `path` follows the same mTLS/tailscale convention as [`Self::request`].
471	pub async fn request_json<Res: serde::de::DeserializeOwned>(
472		&self,
473		method: reqwest::Method,
474		base_url: &Url,
475		path: &str,
476		body: Option<&(impl serde::Serialize + ?Sized)>,
477	) -> Result<Res> {
478		let mut req = self.request(method, base_url, path).await?;
479		if let Some(body) = body {
480			req = req.json(body);
481		}
482
483		let response = req
484			.send()
485			.await
486			.into_diagnostic()
487			.wrap_err_with(|| format!("calling canopy {path}"))?;
488
489		let status = response.status();
490		if !status.is_success() {
491			let body = response.text().await.unwrap_or_default();
492			return Err(miette::miette!("canopy {path} returned {status}: {body}"));
493		}
494
495		response
496			.json::<Res>()
497			.await
498			.into_diagnostic()
499			.wrap_err_with(|| format!("parsing canopy {path} response"))
500	}
501
502	/// Register the backup types this server can run (`POST /backup-capabilities`).
503	pub async fn backup_capabilities(&self, base_url: &Url, types: &[String]) -> Result<()> {
504		let (http, url) = self.endpoint_url(base_url, "/backup-capabilities").await?;
505		debug!(%url, ?types, "registering backup capabilities with canopy");
506		let response = http
507			.post(url)
508			.header("X-Version", &self.tamanu_version)
509			.json(&CapabilitiesArgs {
510				types: types.to_vec(),
511			})
512			.send()
513			.await
514			.into_diagnostic()
515			.wrap_err("posting backup capabilities to canopy")?;
516
517		let status = response.status();
518		if !status.is_success() {
519			let body = response.text().await.unwrap_or_default();
520			return Err(miette::miette!(
521				"canopy /backup-capabilities returned {status}: {body}"
522			));
523		}
524		Ok(())
525	}
526
527	/// Obtain short-lived S3 credentials for a backup type (`POST /backup-credentials`).
528	///
529	/// Returns the `credential_process`-shaped creds; the caller translates them
530	/// to the container-creds shape for kopia. `412`/`409`/`502` surface as errors.
531	pub async fn backup_credentials(
532		&self,
533		base_url: &Url,
534		backup_type: &str,
535		purpose: BackupPurpose,
536	) -> Result<CredentialProcessOutput> {
537		let (http, url) = self.endpoint_url(base_url, "/backup-credentials").await?;
538		debug!(%url, backup_type, ?purpose, "requesting backup credentials from canopy");
539		let response = http
540			.post(url)
541			.header("X-Version", &self.tamanu_version)
542			.json(&CredentialsArgs {
543				type_: backup_type.to_owned(),
544				purpose: Some(purpose),
545			})
546			.send()
547			.await
548			.into_diagnostic()
549			.wrap_err("posting backup credentials request to canopy")?;
550
551		let status = response.status();
552		if !status.is_success() {
553			let body = response.text().await.unwrap_or_default();
554			return Err(miette::miette!(
555				"canopy /backup-credentials returned {status}: {body}"
556			));
557		}
558		response
559			.json::<CredentialProcessOutput>()
560			.await
561			.into_diagnostic()
562			.wrap_err("parsing backup credentials from canopy")
563	}
564
565	/// Fetch the S3 repo target (`GET /backup-target`).
566	///
567	/// `412`/`409` mean the device isn't yet authorised for backups; these map to
568	/// [`TargetOutcome::Dormant`] (a benign idle state) rather than an error.
569	pub async fn backup_target(&self, base_url: &Url) -> Result<TargetOutcome> {
570		let (http, url) = self.endpoint_url(base_url, "/backup-target").await?;
571		debug!(%url, "fetching backup target from canopy");
572		let response = http
573			.get(url)
574			.header("X-Version", &self.tamanu_version)
575			.send()
576			.await
577			.into_diagnostic()
578			.wrap_err("fetching backup target from canopy")?;
579
580		let status = response.status();
581		if status == reqwest::StatusCode::PRECONDITION_FAILED
582			|| status == reqwest::StatusCode::CONFLICT
583		{
584			return Ok(TargetOutcome::Dormant);
585		}
586		if !status.is_success() {
587			let body = response.text().await.unwrap_or_default();
588			return Err(miette::miette!(
589				"canopy /backup-target returned {status}: {body}"
590			));
591		}
592		let target = response
593			.json::<BackupTarget>()
594			.await
595			.into_diagnostic()
596			.wrap_err("parsing backup target from canopy")?;
597		Ok(TargetOutcome::Ready(target))
598	}
599
600	/// Report a completed backup/restore run (`POST /backup-report`).
601	pub async fn backup_report(&self, base_url: &Url, report: &ReportArgs) -> Result<()> {
602		let (http, url) = self.endpoint_url(base_url, "/backup-report").await?;
603		debug!(%url, run_id = %report.run_id, "reporting backup outcome to canopy");
604		let response = http
605			.post(url)
606			.header("X-Version", &self.tamanu_version)
607			.json(report)
608			.send()
609			.await
610			.into_diagnostic()
611			.wrap_err("posting backup report to canopy")?;
612
613		let status = response.status();
614		if !status.is_success() {
615			let body = response.text().await.unwrap_or_default();
616			return Err(miette::miette!(
617				"canopy /backup-report returned {status}: {body}"
618			));
619		}
620		Ok(())
621	}
622
623	/// Register the restore intents this consumer supports (`POST /restore-capabilities`).
624	///
625	/// Replaces the registered intent set wholesale. Canopy dispatches only
626	/// matching worklist entries.
627	pub async fn restore_capabilities(&self, base_url: &Url, intents: &[&str]) -> Result<()> {
628		let (http, url) = self.endpoint_url(base_url, "/restore-capabilities").await?;
629		debug!(%url, ?intents, "registering restore capabilities with canopy");
630		let response = http
631			.post(url)
632			.header("X-Version", &self.tamanu_version)
633			.json(&RestoreCapabilitiesRequest { intents })
634			.send()
635			.await
636			.into_diagnostic()
637			.wrap_err("posting restore capabilities to canopy")?;
638
639		let status = response.status();
640		if !status.is_success() {
641			let body = response.text().await.unwrap_or_default();
642			return Err(miette::miette!(
643				"canopy /restore-capabilities returned {status}: {body}"
644			));
645		}
646		Ok(())
647	}
648
649	/// Fetch the desired-state worklist of replicas to restore (`GET /restore-worklist`).
650	pub async fn restore_worklist(&self, base_url: &Url) -> Result<Vec<WorklistEntry>> {
651		let (http, url) = self.endpoint_url(base_url, "/restore-worklist").await?;
652		debug!(%url, "fetching restore worklist from canopy");
653		let response = http
654			.get(url)
655			.header("X-Version", &self.tamanu_version)
656			.send()
657			.await
658			.into_diagnostic()
659			.wrap_err("fetching restore worklist from canopy")?;
660
661		let status = response.status();
662		if !status.is_success() {
663			let body = response.text().await.unwrap_or_default();
664			return Err(miette::miette!(
665				"canopy /restore-worklist returned {status}: {body}"
666			));
667		}
668		response
669			.json::<Vec<WorklistEntry>>()
670			.await
671			.into_diagnostic()
672			.wrap_err("parsing restore worklist from canopy")
673	}
674
675	/// Obtain read-only S3 credentials and the repo password for a group
676	/// (`POST /restore-credentials`).
677	///
678	/// Creds are 1-hour chained STS; refresh by re-calling. `403`/`409`/`502`
679	/// surface as errors.
680	pub async fn restore_credentials(
681		&self,
682		base_url: &Url,
683		backup_type: &str,
684		group: Uuid,
685	) -> Result<RestoreCredentials> {
686		let (http, url) = self.endpoint_url(base_url, "/restore-credentials").await?;
687		debug!(%url, backup_type, %group, "requesting restore credentials from canopy");
688		let response = http
689			.post(url)
690			.header("X-Version", &self.tamanu_version)
691			.json(&RestoreCredentialsRequest {
692				group,
693				r#type: backup_type,
694			})
695			.send()
696			.await
697			.into_diagnostic()
698			.wrap_err("posting restore credentials request to canopy")?;
699
700		let status = response.status();
701		if !status.is_success() {
702			let body = response.text().await.unwrap_or_default();
703			return Err(miette::miette!(
704				"canopy /restore-credentials returned {status}: {body}"
705			));
706		}
707		response
708			.json::<RestoreCredentials>()
709			.await
710			.into_diagnostic()
711			.wrap_err("parsing restore credentials from canopy")
712	}
713
714	/// Report a restore's health (`POST /restore-verification`).
715	pub async fn restore_verification(
716		&self,
717		base_url: &Url,
718		report: &VerificationArgs,
719	) -> Result<()> {
720		let (http, url) = self.endpoint_url(base_url, "/restore-verification").await?;
721		debug!(%url, group = %report.group, "reporting restore verification to canopy");
722		let response = http
723			.post(url)
724			.header("X-Version", &self.tamanu_version)
725			.json(report)
726			.send()
727			.await
728			.into_diagnostic()
729			.wrap_err("posting restore verification to canopy")?;
730
731		let status = response.status();
732		if !status.is_success() {
733			let body = response.text().await.unwrap_or_default();
734			return Err(miette::miette!(
735				"canopy /restore-verification returned {status}: {body}"
736			));
737		}
738		Ok(())
739	}
740}
741
742/// Probe the tailscale canopy endpoint.
743///
744/// Returns a configured `reqwest::Client` if `GET /public/servers` responds
745/// 2xx — anything else (timeout, non-2xx, transport error) returns `None` so
746/// the caller can fall back to mTLS.
747///
748/// Tries two paths in order:
749/// 1. Resolve `canopy` via the tailscale DNS server (100.100.100.100) and
750///    probe with those addresses.
751/// 2. Use hardcoded tailscale IPs for canopy and probe with those.
752///
753/// `/public/servers` is used because:
754/// - it lives under `/public/...`, the only mount that accepts tagged-device
755///   tailscale callers (everything else 403s with `tagged-device-not-allowed`);
756/// - it's a `GET` with no body, no `VersionHeader` requirement, and no auth;
757/// - it's read-only, so probing it has no side effects.
758async fn probe_tailscale(make_builder: &ClientBuilderFactory) -> Option<reqwest::Client> {
759	let dns_addrs: Vec<SocketAddr> = tailscale_resolver()
760		.lookup_ip("canopy")
761		.await
762		.ok()
763		.map(|addrs| addrs.iter().map(|ip| SocketAddr::new(ip, 443)).collect())
764		.unwrap_or_default();
765	if !dns_addrs.is_empty()
766		&& let Some(client) = try_probe(&dns_addrs, make_builder).await
767	{
768		return Some(client);
769	}
770
771	let hardcoded = [
772		SocketAddr::new(IpAddr::V4(CANOPY_HARDCODED_V4), 443),
773		SocketAddr::new(IpAddr::V6(CANOPY_HARDCODED_V6), 443),
774	];
775	debug!(
776		?hardcoded,
777		"canopy tailscale DNS lookup empty or probe failed, trying hardcoded IPs"
778	);
779	try_probe(&hardcoded, make_builder).await
780}
781
782async fn try_probe(
783	addrs: &[SocketAddr],
784	make_builder: &ClientBuilderFactory,
785) -> Option<reqwest::Client> {
786	let client = make_builder()
787		.timeout(TAILSCALE_PROBE_TIMEOUT)
788		.resolve_to_addrs(TAILSCALE_HOST, addrs)
789		.build()
790		.ok()?;
791
792	let url = format!("{TAILSCALE_URL}/public/servers");
793	match client.get(&url).send().await {
794		Ok(resp) if resp.status().is_success() => Some(client),
795		Ok(resp) => {
796			debug!(status = %resp.status(), ?addrs, "canopy tailscale probe: unexpected status");
797			None
798		}
799		Err(err) => {
800			debug!(?addrs, "canopy tailscale probe failed: {err}");
801			None
802		}
803	}
804}
805
806fn tailscale_resolver() -> Resolver<impl ConnectionProvider> {
807	Resolver::builder_with_config(
808		ResolverConfig::from_parts(
809			None,
810			vec!["tail53aef.ts.net.".parse().unwrap()],
811			vec![NameServerConfig::new(
812				"100.100.100.100".parse().unwrap(),
813				true,
814				vec![ConnectionConfig::udp()],
815			)],
816		),
817		TokioRuntimeProvider::default(),
818	)
819	.build()
820	.expect("tailscale resolver config is hardcoded and cannot fail to build")
821}
822
823fn gzip_bytes(bytes: &[u8]) -> std::io::Result<Vec<u8>> {
824	let mut encoder = GzEncoder::new(Vec::with_capacity(bytes.len() / 2), Compression::default());
825	encoder.write_all(bytes)?;
826	encoder.finish()
827}
828
829/// Build a short-lived self-signed client certificate from a P-256 device key
830/// PEM and wrap it as a reqwest mTLS [`Identity`].
831///
832/// Canopy identifies a device by its certificate's public key (SPKI), not by a
833/// CA chain, so a fresh self-signed cert from the device key is all that's
834/// needed. The same device key drives both the long-running canopy client here
835/// and the one-shot `canopy register` enrollment handshake, so they present the
836/// same identity to canopy.
837pub fn device_identity(device_key_pem: &str) -> Result<reqwest::Identity> {
838	let key_pair = KeyPair::from_pem(device_key_pem)
839		.into_diagnostic()
840		.wrap_err("parsing device key PEM")?;
841
842	let mut params = CertificateParams::new(vec!["device.local".into()])
843		.into_diagnostic()
844		.wrap_err("building certificate params")?;
845	params.distinguished_name = DistinguishedName::new();
846	params
847		.distinguished_name
848		.push(DnType::CommonName, "device.local");
849
850	let now = OffsetDateTime::now_utc();
851	params.not_before = now - TimeDuration::minutes(1);
852	params.not_after = now + TimeDuration::days(CERT_VALIDITY_DAYS);
853
854	let cert = params
855		.self_signed(&key_pair)
856		.into_diagnostic()
857		.wrap_err("self-signing certificate")?;
858
859	let mut combined = cert.pem();
860	combined.push('\n');
861	combined.push_str(&key_pair.serialize_pem());
862
863	reqwest::Identity::from_pem(combined.as_bytes())
864		.into_diagnostic()
865		.wrap_err("building reqwest TLS identity")
866}
867
868fn build_mtls_http(
869	make_builder: &ClientBuilderFactory,
870	device_key_pem: &str,
871) -> Result<reqwest::Client> {
872	let identity = device_identity(device_key_pem)?;
873
874	make_builder()
875		.identity(identity)
876		.use_rustls_tls()
877		.timeout(Duration::from_secs(30))
878		.build()
879		.into_diagnostic()
880		.wrap_err("building canopy HTTP client")
881}
882
883#[cfg(test)]
884mod tests {
885	use super::*;
886
887	const TEST_DEVICE_KEY: &str = "\
888-----BEGIN PRIVATE KEY-----
889MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgVvhzsYiidp38GYn1
890KxD5Wipc/h8lglVsy1UFZq/SZbGhRANCAAT2EsEq7xjeWVnim9XwdYXga/LBbppm
891fXLgamTYOa/w9n/Ta64fiYWmN54kEd0DgnflJDLtID321Zz6xswvK/VN
892-----END PRIVATE KEY-----";
893
894	fn test_factory() -> ClientBuilderFactory {
895		Arc::new(reqwest::Client::builder)
896	}
897
898	#[test]
899	fn build_mtls_http_from_p256_key() {
900		// Direct mTLS-path build, bypassing the async constructor / tailscale probe.
901		let result = build_mtls_http(&test_factory(), TEST_DEVICE_KEY);
902		assert!(result.is_ok(), "{:?}", result.err());
903	}
904
905	#[test]
906	fn build_mtls_http_fails_on_garbage_key() {
907		assert!(build_mtls_http(&test_factory(), "not a real PEM").is_err());
908	}
909
910	#[tokio::test]
911	async fn renew_with_mtls_state_swaps_in_fresh_client() {
912		// Construct an mTLS-state client directly (no network probe) and renew it.
913		let http = build_mtls_http(&test_factory(), TEST_DEVICE_KEY).unwrap();
914		let client = CanopyClient {
915			device_key: Some(Redacted(TEST_DEVICE_KEY.to_owned())),
916			tamanu_version: "2.54.2".into(),
917			make_builder: test_factory(),
918			state: RwLock::new(State::Mtls(http)),
919		};
920		client.renew().await.expect("renew should succeed");
921		assert!(!client.is_tailscale().await);
922	}
923
924	#[tokio::test]
925	async fn renew_is_noop_in_tailscale_mode() {
926		// Tailscale-state client with no device key — renew is a no-op.
927		let http = reqwest::Client::new();
928		let client = CanopyClient {
929			device_key: None,
930			tamanu_version: "2.54.2".into(),
931			make_builder: test_factory(),
932			state: RwLock::new(State::Tailscale(http)),
933		};
934		client.renew().await.expect("renew should be a no-op");
935		assert!(client.is_tailscale().await);
936	}
937
938	fn mtls_client_against(base: &str) -> (CanopyClient, Url) {
939		let http = build_mtls_http(&test_factory(), TEST_DEVICE_KEY).unwrap();
940		let client = CanopyClient {
941			device_key: Some(Redacted(TEST_DEVICE_KEY.to_owned())),
942			tamanu_version: "2.54.2".into(),
943			make_builder: test_factory(),
944			state: RwLock::new(State::Mtls(http)),
945		};
946		(client, base.parse().unwrap())
947	}
948
949	struct Captured {
950		request_line: String,
951		headers: String,
952		body: Vec<u8>,
953	}
954
955	/// Bind a loopback socket and answer exactly one HTTP request with
956	/// `response`, capturing the received request line, headers, and body.
957	fn serve_once(response: &'static str) -> (String, std::thread::JoinHandle<Captured>) {
958		use std::io::{Read, Write};
959		use std::net::TcpListener;
960
961		let listener = TcpListener::bind("127.0.0.1:0").unwrap();
962		let base = format!("http://{}", listener.local_addr().unwrap());
963		let handle = std::thread::spawn(move || {
964			let (mut stream, _) = listener.accept().unwrap();
965			let mut buf = Vec::new();
966			let mut chunk = [0u8; 1024];
967			let header_end = loop {
968				if let Some(pos) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
969					break pos + 4;
970				}
971				let n = stream.read(&mut chunk).unwrap();
972				if n == 0 {
973					panic!("connection closed before headers were complete");
974				}
975				buf.extend_from_slice(&chunk[..n]);
976			};
977
978			let head = String::from_utf8_lossy(&buf[..header_end]).into_owned();
979			let content_length = head
980				.lines()
981				.find_map(|line| {
982					let (name, value) = line.split_once(':')?;
983					name.trim()
984						.eq_ignore_ascii_case("content-length")
985						.then(|| value.trim().parse::<usize>().ok())
986						.flatten()
987				})
988				.unwrap_or(0);
989
990			let mut body = buf[header_end..].to_vec();
991			while body.len() < content_length {
992				let n = stream.read(&mut chunk).unwrap();
993				if n == 0 {
994					break;
995				}
996				body.extend_from_slice(&chunk[..n]);
997			}
998
999			stream.write_all(response.as_bytes()).unwrap();
1000			stream.flush().unwrap();
1001
1002			let mut lines = head.lines();
1003			let request_line = lines.next().unwrap_or_default().to_owned();
1004			let headers = lines.collect::<Vec<_>>().join("\n");
1005			Captured {
1006				request_line,
1007				headers,
1008				body,
1009			}
1010		});
1011		(base, handle)
1012	}
1013
1014	#[derive(Debug, serde::Deserialize, PartialEq)]
1015	struct Echo {
1016		ok: bool,
1017		who: String,
1018	}
1019
1020	#[tokio::test]
1021	async fn request_json_sends_version_and_body_and_parses_response() {
1022		let (base, handle) = serve_once(
1023			"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 26\r\n\r\n{\"ok\":true,\"who\":\"device\"}",
1024		);
1025		let (client, base_url) = mtls_client_against(&base);
1026
1027		let payload = serde_json::json!({ "hello": "world" });
1028		let got: Echo = client
1029			.request_json(reqwest::Method::POST, &base_url, "/thing", Some(&payload))
1030			.await
1031			.expect("request_json should succeed");
1032
1033		assert_eq!(
1034			got,
1035			Echo {
1036				ok: true,
1037				who: "device".into()
1038			}
1039		);
1040
1041		let captured = handle.join().unwrap();
1042		assert!(
1043			captured.request_line.starts_with("POST /thing "),
1044			"unexpected request line: {}",
1045			captured.request_line
1046		);
1047		assert!(
1048			captured
1049				.headers
1050				.to_ascii_lowercase()
1051				.contains("x-version: 2.54.2"),
1052			"missing X-Version header in:\n{}",
1053			captured.headers
1054		);
1055		let sent: serde_json::Value = serde_json::from_slice(&captured.body).unwrap();
1056		assert_eq!(sent, payload);
1057	}
1058
1059	#[tokio::test]
1060	async fn request_json_errors_on_non_success_with_body() {
1061		let (base, handle) =
1062			serve_once("HTTP/1.1 418 I'm a teapot\r\nContent-Length: 14\r\n\r\nno coffee here");
1063		let (client, base_url) = mtls_client_against(&base);
1064
1065		let err = client
1066			.request_json::<serde_json::Value>(
1067				reqwest::Method::GET,
1068				&base_url,
1069				"/brew",
1070				None::<&()>,
1071			)
1072			.await
1073			.expect_err("non-2xx should error");
1074		let msg = err.to_string();
1075		assert!(msg.contains("/brew"), "expected path in error: {msg}");
1076		assert!(msg.contains("418"), "expected status in error: {msg}");
1077		assert!(
1078			msg.contains("no coffee here"),
1079			"expected body text in error: {msg}"
1080		);
1081
1082		handle.join().unwrap();
1083	}
1084
1085	#[test]
1086	fn user_agent_has_product_and_os_comment() {
1087		let ua = user_agent("bestool", "1.2.3");
1088		assert!(
1089			ua.starts_with("bestool/1.2.3 "),
1090			"unexpected user-agent: {ua}"
1091		);
1092		assert!(ua.contains('('), "expected OS comment in: {ua}");
1093		assert!(ua.ends_with(')'), "expected OS comment in: {ua}");
1094		assert!(
1095			ua.contains(sysinfo::System::cpu_arch().as_str()),
1096			"expected arch in: {ua}"
1097		);
1098	}
1099
1100	#[test]
1101	fn gzip_bytes_roundtrips() {
1102		use flate2::read::GzDecoder;
1103		use std::io::Read;
1104
1105		let original = br#"{"health":[{"check":"x","result":"passed"}]}"#;
1106		let compressed = gzip_bytes(original).expect("gzip should succeed");
1107		assert!(
1108			compressed.starts_with(&[0x1f, 0x8b]),
1109			"expected gzip magic bytes"
1110		);
1111		let mut decoder = GzDecoder::new(&compressed[..]);
1112		let mut decompressed = Vec::new();
1113		decoder.read_to_end(&mut decompressed).unwrap();
1114		assert_eq!(decompressed, original);
1115	}
1116}