Skip to main content

bestool_canopy/
client.rs

1use std::{
2	fmt,
3	io::Write,
4	net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
5	sync::{Arc, OnceLock},
6	time::Duration,
7};
8
9use flate2::{Compression, write::GzEncoder};
10use hickory_resolver::{
11	ConnectionProvider, Resolver,
12	config::{ConnectionConfig, NameServerConfig, ResolverConfig},
13	net::runtime::TokioRuntimeProvider,
14};
15use jiff::Timestamp;
16use miette::{IntoDiagnostic, Result, WrapErr};
17use rcgen::{CertificateParams, DistinguishedName, DnType, KeyPair};
18use reqwest::Url;
19use serde::{Deserialize, Serialize};
20use time::{Duration as TimeDuration, OffsetDateTime};
21use tokio::sync::RwLock;
22use tracing::debug;
23use uuid::Uuid;
24
25use crate::{
26	Redacted,
27	backup::{
28		BackupCredentials, BackupCredentialsRequest, BackupReport, BackupTarget,
29		CapabilitiesRequest, Purpose, TargetOutcome,
30	},
31	restore::{
32		RestoreCapabilitiesRequest, RestoreCredentials, RestoreCredentialsRequest,
33		RestoreVerification, WorklistEntry,
34	},
35};
36
37pub const DEFAULT_CANOPY_URL: &str = "https://meta.tamanu.app";
38
39/// Base URL for the tailscale-internal canopy endpoint.
40///
41/// On hosts that share the canopy tailnet, posting to this URL works without
42/// mTLS — the tailscale identity is the auth.
43pub const TAILSCALE_URL: &str = "https://canopy.tail53aef.ts.net";
44
45/// Bare hostname used for `resolve_to_addrs` overrides.
46const TAILSCALE_HOST: &str = "canopy.tail53aef.ts.net";
47
48/// Hardcoded tailscale IPs for canopy, used when tailscale DNS
49/// (100.100.100.100) is unreachable but the tailnet otherwise is.
50const CANOPY_HARDCODED_V4: Ipv4Addr = Ipv4Addr::new(100, 99, 98, 97);
51const CANOPY_HARDCODED_V6: Ipv6Addr =
52	Ipv6Addr::new(0xfd7a, 0x115c, 0xa1e0, 0, 0, 0, 0x9337, 0xfb52);
53
54/// How long renewed canopy certs are valid for.
55///
56/// Set well above [`CERT_RENEW_AFTER`] so a renewal failure doesn't immediately
57/// strand the client.
58const CERT_VALIDITY_DAYS: i64 = 6;
59
60/// How long to wait between scheduled cert renewals.
61///
62/// Renewal runs in a background task in the daemon; the legacy single-shot
63/// alerts command builds the client once and exits well within this window.
64pub const CERT_RENEW_AFTER: Duration = Duration::from_secs(5 * 24 * 60 * 60);
65
66/// Timeout for the tailscale availability probe.
67const TAILSCALE_PROBE_TIMEOUT: Duration = Duration::from_secs(5);
68
69/// Factory producing the base [`reqwest::ClientBuilder`] for canopy's clients.
70///
71/// The caller supplies this so it owns cross-cutting client config (user-agent,
72/// `SSLKEYLOGFILE`, proxies, …). Canopy invokes it whenever it needs to build or
73/// rebuild a client — at probe time, on mTLS cert renewal, and on reload — then
74/// layers its own concerns (mTLS identity, DNS overrides, timeouts) on top.
75pub type ClientBuilderFactory = Arc<dyn Fn() -> reqwest::ClientBuilder + Send + Sync>;
76
77/// Browser-style user-agent string, e.g. `bestool/1.2.3 (Linux 7.0.9 Arch Linux; x86_64)`.
78///
79/// `product` and `version` identify the calling binary; the OS comment is
80/// detected at runtime and cached.
81pub fn user_agent(product: &str, version: &str) -> String {
82	static OS_COMMENT: OnceLock<String> = OnceLock::new();
83	let os_comment = OS_COMMENT.get_or_init(|| {
84		let os = sysinfo::System::long_os_version()
85			.or_else(sysinfo::System::name)
86			.unwrap_or_else(|| std::env::consts::OS.to_owned());
87		format!("{os}; {}", sysinfo::System::cpu_arch())
88	});
89	format!("{product}/{version} ({os_comment})")
90}
91
92/// A [`reqwest::ClientBuilder`] carrying the `bestool` [`user_agent`] for `version`.
93///
94/// Convenience for callers that don't need any extra client config; suitable as
95/// the base of a [`ClientBuilderFactory`].
96pub fn client_builder(version: &str) -> reqwest::ClientBuilder {
97	reqwest::Client::builder().user_agent(user_agent("bestool", version))
98}
99
100/// Probe the canopy tailnet endpoint, returning a client routed to it if
101/// reachable.
102///
103/// The returned client carries the same DNS / hardcoded-IP resolution override
104/// the reporting client uses and presents **no** client certificate — callers
105/// reaching canopy this way authenticate by tailnet identity. Returns `None`
106/// when the tailnet endpoint isn't reachable, so callers can fall back to
107/// public mTLS.
108pub async fn tailscale_client(make_builder: &ClientBuilderFactory) -> Option<reqwest::Client> {
109	probe_tailscale(make_builder).await
110}
111
112/// Severities accepted by the canopy `/events` API.
113///
114/// Canopy narrowed its vocabulary from RFC 5424 to this five-level set; the
115/// retired syslog severities (`emergency`, `alert`, `notice`) are rejected by
116/// the strict enum validation on `POST /events`.
117#[derive(Copy, Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
118#[serde(rename_all = "lowercase")]
119pub enum Severity {
120	Critical,
121	Error,
122	Warning,
123	Info,
124	Debug,
125}
126
127/// Payload for posting to `POST /events` on a canopy server.
128#[derive(Debug, Clone, Serialize)]
129pub struct NewEvent<'a> {
130	pub source: &'a str,
131	#[serde(rename = "ref")]
132	pub r#ref: &'a str,
133	pub message: &'a str,
134	#[serde(skip_serializing_if = "Option::is_none")]
135	pub description: Option<&'a str>,
136	#[serde(skip_serializing_if = "Option::is_none")]
137	pub severity: Option<Severity>,
138	#[serde(rename = "occurredAt", skip_serializing_if = "Option::is_none")]
139	pub occurred_at: Option<Timestamp>,
140	#[serde(skip_serializing_if = "Option::is_none")]
141	pub active: Option<bool>,
142}
143
144/// HTTP client with auth configured for talking to a canopy server.
145///
146/// Tries two auth paths in order of preference:
147/// 1. **Tailscale**: if the canopy tailnet endpoint is reachable, plain HTTPS
148///    works (auth is implicit via tailscale identity).
149/// 2. **mTLS**: a fresh self-signed cert from the device key, short-lived
150///    ([`CERT_VALIDITY_DAYS`]); for long-running daemons, [`Self::renew`]
151///    should tick on [`CERT_RENEW_AFTER`] to swap in a fresh cert before expiry.
152///
153/// [`Self::refresh`] re-probes tailscale and swaps modes on reload.
154pub struct CanopyClient {
155	device_key: Option<Redacted<String>>,
156	/// Tamanu version of the install this client speaks for. Sent verbatim in
157	/// the `X-Version` request header — canopy rejects events / status pushes
158	/// that don't carry one. Sourced from the running Tamanu install's
159	/// `package.json` (via `find_tamanu`); not the bestool / alertd version.
160	tamanu_version: String,
161	/// Produces the base client builder; see [`ClientBuilderFactory`].
162	make_builder: ClientBuilderFactory,
163	state: RwLock<State>,
164}
165
166enum State {
167	Tailscale(reqwest::Client),
168	Mtls(reqwest::Client),
169}
170
171impl State {
172	fn is_tailscale(&self) -> bool {
173		matches!(self, State::Tailscale(_))
174	}
175
176	fn http(&self) -> reqwest::Client {
177		match self {
178			State::Tailscale(http) | State::Mtls(http) => http.clone(),
179		}
180	}
181}
182
183impl fmt::Debug for CanopyClient {
184	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185		f.debug_struct("CanopyClient").finish_non_exhaustive()
186	}
187}
188
189impl CanopyClient {
190	/// Build a canopy client, preferring tailscale and falling back to mTLS.
191	///
192	/// Probes the tailscale canopy endpoint first; if reachable, uses it.
193	/// Otherwise, if a device key PEM is provided, builds an mTLS client.
194	/// Returns `Ok(None)` if neither path is available.
195	///
196	/// `tamanu_version` is the version of the Tamanu install this client
197	/// speaks for; sent on every request via the `X-Version` header.
198	///
199	/// `make_builder` supplies the base [`reqwest::ClientBuilder`] — see
200	/// [`ClientBuilderFactory`]. Use [`client_builder`] for a sensible default.
201	pub async fn new(
202		tamanu_version: impl Into<String>,
203		device_key_pem: Option<&str>,
204		make_builder: impl Fn() -> reqwest::ClientBuilder + Send + Sync + 'static,
205	) -> Result<Option<Self>> {
206		let tamanu_version = tamanu_version.into();
207		let device_key = device_key_pem.map(|s| Redacted(s.to_owned()));
208		let make_builder: ClientBuilderFactory = Arc::new(make_builder);
209
210		if let Some(http) = probe_tailscale(&make_builder).await {
211			debug!("canopy: tailscale endpoint reachable, preferring it");
212			return Ok(Some(Self {
213				device_key,
214				tamanu_version,
215				make_builder,
216				state: RwLock::new(State::Tailscale(http)),
217			}));
218		}
219
220		if let Some(pem) = device_key_pem {
221			debug!("canopy: tailscale unreachable, falling back to mTLS");
222			let http = build_mtls_http(&make_builder, pem)?;
223			return Ok(Some(Self {
224				device_key,
225				tamanu_version,
226				make_builder,
227				state: RwLock::new(State::Mtls(http)),
228			}));
229		}
230
231		Ok(None)
232	}
233
234	/// Returns true if the client is currently using the tailscale path.
235	pub async fn is_tailscale(&self) -> bool {
236		self.state.read().await.is_tailscale()
237	}
238
239	/// Re-probe tailscale and swap modes if the picture has changed.
240	///
241	/// Intended to be called when the daemon receives a reload signal.
242	pub async fn refresh(&self) -> Result<()> {
243		if let Some(http) = probe_tailscale(&self.make_builder).await {
244			let mut state = self.state.write().await;
245			if !state.is_tailscale() {
246				debug!("canopy refresh: switching to tailscale path");
247			}
248			*state = State::Tailscale(http);
249			return Ok(());
250		}
251
252		if let Some(pem) = &self.device_key {
253			let http = build_mtls_http(&self.make_builder, &pem.0)?;
254			let mut state = self.state.write().await;
255			if state.is_tailscale() {
256				debug!("canopy refresh: tailscale dropped, falling back to mTLS");
257			}
258			*state = State::Mtls(http);
259			return Ok(());
260		}
261
262		debug!("canopy refresh: no auth path available, keeping current state");
263		Ok(())
264	}
265
266	/// Rebuild the underlying HTTP client with a fresh certificate.
267	///
268	/// No-op in tailscale mode (no cert to rotate). In mTLS mode, atomically
269	/// replaces the live client; in-flight requests continue with the old
270	/// client until they complete.
271	pub async fn renew(&self) -> Result<()> {
272		let Some(pem) = &self.device_key else {
273			return Ok(());
274		};
275		let mut state = self.state.write().await;
276		if state.is_tailscale() {
277			return Ok(());
278		}
279		*state = State::Mtls(build_mtls_http(&self.make_builder, &pem.0)?);
280		Ok(())
281	}
282
283	/// POST a status snapshot to the canopy server.
284	///
285	/// In tailscale mode, `base_url` is ignored and a `{TAILSCALE_URL}/public/status/{server_id}`
286	/// URL is used. In mTLS mode, posts to `{base_url}/status/{server_id}`.
287	///
288	/// The payload is free-form JSON; the canopy `/status` contract reserves the
289	/// top-level `health: []` key, whose entries each carry a `result` of
290	/// `passed | warning | failed | broken | skipped`. The body is gzip-encoded
291	/// with `Content-Encoding: gzip`.
292	///
293	/// Returns `backup_now`: the backup-type names canopy says this server should
294	/// back up right now (operator one-offs + schedule-due). Empty means nothing
295	/// to do. A response that predates the field (no `backup_now`) yields an empty
296	/// list, so older canopy deployments keep working.
297	pub async fn post_status(
298		&self,
299		base_url: &Url,
300		server_id: &str,
301		payload: &serde_json::Value,
302	) -> Result<Vec<String>> {
303		let (http, url) = {
304			let state = self.state.read().await;
305			let url = match &*state {
306				State::Tailscale(_) => format!("{TAILSCALE_URL}/public/status/{server_id}")
307					.parse::<Url>()
308					.into_diagnostic()
309					.wrap_err("building tailscale /public/status URL")?,
310				State::Mtls(_) => base_url
311					.join(&format!("/status/{server_id}"))
312					.into_diagnostic()
313					.wrap_err("building /status URL")?,
314			};
315			(state.http(), url)
316		};
317
318		let raw = serde_json::to_vec(payload)
319			.into_diagnostic()
320			.wrap_err("serialising canopy /status payload")?;
321		let compressed = gzip_bytes(&raw)
322			.into_diagnostic()
323			.wrap_err("gzipping canopy /status payload")?;
324
325		debug!(
326			%url,
327			raw_bytes = raw.len(),
328			gzip_bytes = compressed.len(),
329			"posting status snapshot to canopy",
330		);
331
332		let response = http
333			.post(url)
334			.header("X-Version", &self.tamanu_version)
335			.header(reqwest::header::CONTENT_TYPE, "application/json")
336			.header(reqwest::header::CONTENT_ENCODING, "gzip")
337			.body(compressed)
338			.send()
339			.await
340			.into_diagnostic()
341			.wrap_err("posting status to canopy")?;
342
343		let status = response.status();
344		if !status.is_success() {
345			let body = response.text().await.unwrap_or_default();
346			return Err(miette::miette!("canopy /status returned {status}: {body}"));
347		}
348
349		#[derive(Deserialize, Default)]
350		struct StatusResponseTail {
351			#[serde(default)]
352			backup_now: Vec<String>,
353		}
354
355		// The response flattens the persisted Status plus `backup_now`; we read
356		// only the latter and ignore the rest. A body that fails to parse (or
357		// predates the field) is treated as "nothing to do" rather than failing
358		// the status push.
359		let tail = response
360			.json::<StatusResponseTail>()
361			.await
362			.unwrap_or_default();
363		Ok(tail.backup_now)
364	}
365
366	/// GET a path on the canopy server, routed via tailscale when available.
367	///
368	/// In tailscale mode, the request goes to `{TAILSCALE_URL}{tailscale_path}`
369	/// (typically `/public/...`, the only mount that accepts tagged-device
370	/// tailscale callers). In mTLS mode, the request goes to `{base_url}{mtls_path}`.
371	///
372	/// Returns the raw response — the caller is responsible for status checks
373	/// and body parsing so they can choose how to fall back if the response
374	/// isn't usable.
375	pub async fn get(
376		&self,
377		base_url: &Url,
378		tailscale_path: &str,
379		mtls_path: &str,
380	) -> Result<reqwest::Response> {
381		let (http, url) = {
382			let state = self.state.read().await;
383			let url = match &*state {
384				State::Tailscale(_) => format!("{TAILSCALE_URL}{tailscale_path}")
385					.parse::<Url>()
386					.into_diagnostic()
387					.wrap_err("building tailscale GET URL")?,
388				State::Mtls(_) => base_url
389					.join(mtls_path)
390					.into_diagnostic()
391					.wrap_err("building mTLS GET URL")?,
392			};
393			(state.http(), url)
394		};
395
396		debug!(%url, "GET via canopy");
397		http.get(url)
398			.header("X-Version", &self.tamanu_version)
399			.send()
400			.await
401			.into_diagnostic()
402			.wrap_err("GET via canopy")
403	}
404
405	/// POST an event to the canopy server.
406	///
407	/// In tailscale mode, `base_url` is ignored and [`TAILSCALE_URL`] is used.
408	/// In mTLS mode, posts to `{base_url}/events`.
409	pub async fn post_event(&self, base_url: &Url, event: NewEvent<'_>) -> Result<()> {
410		let (http, url) = {
411			let state = self.state.read().await;
412			let url = match &*state {
413				State::Tailscale(_) => format!("{TAILSCALE_URL}/public/events")
414					.parse::<Url>()
415					.into_diagnostic()
416					.wrap_err("building tailscale /public/events URL")?,
417				State::Mtls(_) => base_url
418					.join("/events")
419					.into_diagnostic()
420					.wrap_err("building /events URL")?,
421			};
422			(state.http(), url)
423		};
424
425		debug!(
426			%url,
427			source = event.source,
428			r#ref = event.r#ref,
429			active = ?event.active,
430			"posting event to canopy"
431		);
432
433		let response = http
434			.post(url)
435			.header("X-Version", &self.tamanu_version)
436			.json(&event)
437			.send()
438			.await
439			.into_diagnostic()
440			.wrap_err("posting event to canopy")?;
441
442		let status = response.status();
443		if !status.is_success() {
444			let body = response.text().await.unwrap_or_default();
445			return Err(miette::miette!("canopy /events returned {status}: {body}"));
446		}
447
448		Ok(())
449	}
450
451	/// Resolve an endpoint URL for the current auth path.
452	///
453	/// `path` is the mTLS-mode path (e.g. `/backup-target`); over tailscale the
454	/// same endpoint is mounted under `/public`, so this prepends it.
455	async fn endpoint_url(&self, base_url: &Url, path: &str) -> Result<(reqwest::Client, Url)> {
456		let state = self.state.read().await;
457		let url = match &*state {
458			State::Tailscale(_) => format!("{TAILSCALE_URL}/public{path}")
459				.parse::<Url>()
460				.into_diagnostic()
461				.wrap_err_with(|| format!("building tailscale /public{path} URL"))?,
462			State::Mtls(_) => base_url
463				.join(path)
464				.into_diagnostic()
465				.wrap_err_with(|| format!("building {path} URL"))?,
466		};
467		Ok((state.http(), url))
468	}
469
470	/// Start a request to an arbitrary canopy endpoint on the current auth path.
471	///
472	/// This is the generic escape hatch behind the typed endpoint methods: it
473	/// resolves the right HTTP client and URL for the active auth mode, begins
474	/// the request, and sets the `X-Version` header. The returned builder is
475	/// yours to finish — add query params, a body, or extra headers, then
476	/// `.send()` and parse the response however suits.
477	///
478	/// `path` is the mTLS-mode path (e.g. `/backup-target`); over tailscale the
479	/// same endpoint is mounted under `/public`, so this routes it there, the
480	/// same convention the other endpoint methods follow.
481	pub async fn request(
482		&self,
483		method: reqwest::Method,
484		base_url: &Url,
485		path: &str,
486	) -> Result<reqwest::RequestBuilder> {
487		let (http, url) = self.endpoint_url(base_url, path).await?;
488		debug!(%url, %method, "arbitrary canopy request");
489		Ok(http
490			.request(method, url)
491			.header("X-Version", &self.tamanu_version))
492	}
493
494	/// Call an arbitrary canopy endpoint and parse its JSON response.
495	///
496	/// Builds the request via [`Self::request`], attaches `body` as JSON when
497	/// it's `Some`, sends it, and on a 2xx response parses the body into `Res`.
498	/// A non-success status becomes an error carrying the status and response
499	/// body, matching the other endpoint methods. This absorbs the status-check
500	/// and parse boilerplate.
501	///
502	/// Use [`serde_json::Value`] for `Res` (and/or the body) for fully dynamic
503	/// calls, or any concrete type for typed calls. When passing no body, pin
504	/// the inference with a turbofish, e.g. `None::<&()>`.
505	///
506	/// `path` follows the same mTLS/tailscale convention as [`Self::request`].
507	pub async fn request_json<Res: serde::de::DeserializeOwned>(
508		&self,
509		method: reqwest::Method,
510		base_url: &Url,
511		path: &str,
512		body: Option<&(impl serde::Serialize + ?Sized)>,
513	) -> Result<Res> {
514		let mut req = self.request(method, base_url, path).await?;
515		if let Some(body) = body {
516			req = req.json(body);
517		}
518
519		let response = req
520			.send()
521			.await
522			.into_diagnostic()
523			.wrap_err_with(|| format!("calling canopy {path}"))?;
524
525		let status = response.status();
526		if !status.is_success() {
527			let body = response.text().await.unwrap_or_default();
528			return Err(miette::miette!("canopy {path} returned {status}: {body}"));
529		}
530
531		response
532			.json::<Res>()
533			.await
534			.into_diagnostic()
535			.wrap_err_with(|| format!("parsing canopy {path} response"))
536	}
537
538	/// Register the backup types this server can run (`POST /backup-capabilities`).
539	pub async fn backup_capabilities(&self, base_url: &Url, types: &[String]) -> Result<()> {
540		let (http, url) = self.endpoint_url(base_url, "/backup-capabilities").await?;
541		debug!(%url, ?types, "registering backup capabilities with canopy");
542		let response = http
543			.post(url)
544			.header("X-Version", &self.tamanu_version)
545			.json(&CapabilitiesRequest { types })
546			.send()
547			.await
548			.into_diagnostic()
549			.wrap_err("posting backup capabilities to canopy")?;
550
551		let status = response.status();
552		if !status.is_success() {
553			let body = response.text().await.unwrap_or_default();
554			return Err(miette::miette!(
555				"canopy /backup-capabilities returned {status}: {body}"
556			));
557		}
558		Ok(())
559	}
560
561	/// Obtain short-lived S3 credentials for a backup type (`POST /backup-credentials`).
562	///
563	/// Returns the `credential_process`-shaped creds; the caller translates them
564	/// to the container-creds shape for kopia. `412`/`409`/`502` surface as errors.
565	pub async fn backup_credentials(
566		&self,
567		base_url: &Url,
568		backup_type: &str,
569		purpose: Purpose,
570	) -> Result<BackupCredentials> {
571		let (http, url) = self.endpoint_url(base_url, "/backup-credentials").await?;
572		debug!(%url, backup_type, ?purpose, "requesting backup credentials from canopy");
573		let response = http
574			.post(url)
575			.header("X-Version", &self.tamanu_version)
576			.json(&BackupCredentialsRequest {
577				r#type: backup_type,
578				purpose,
579			})
580			.send()
581			.await
582			.into_diagnostic()
583			.wrap_err("posting backup credentials request to canopy")?;
584
585		let status = response.status();
586		if !status.is_success() {
587			let body = response.text().await.unwrap_or_default();
588			return Err(miette::miette!(
589				"canopy /backup-credentials returned {status}: {body}"
590			));
591		}
592		response
593			.json::<BackupCredentials>()
594			.await
595			.into_diagnostic()
596			.wrap_err("parsing backup credentials from canopy")
597	}
598
599	/// Fetch the S3 repo target (`GET /backup-target`).
600	///
601	/// `412`/`409` mean the device isn't yet authorised for backups; these map to
602	/// [`TargetOutcome::Dormant`] (a benign idle state) rather than an error.
603	pub async fn backup_target(&self, base_url: &Url) -> Result<TargetOutcome> {
604		let (http, url) = self.endpoint_url(base_url, "/backup-target").await?;
605		debug!(%url, "fetching backup target from canopy");
606		let response = http
607			.get(url)
608			.header("X-Version", &self.tamanu_version)
609			.send()
610			.await
611			.into_diagnostic()
612			.wrap_err("fetching backup target from canopy")?;
613
614		let status = response.status();
615		if status == reqwest::StatusCode::PRECONDITION_FAILED
616			|| status == reqwest::StatusCode::CONFLICT
617		{
618			return Ok(TargetOutcome::Dormant);
619		}
620		if !status.is_success() {
621			let body = response.text().await.unwrap_or_default();
622			return Err(miette::miette!(
623				"canopy /backup-target returned {status}: {body}"
624			));
625		}
626		let target = response
627			.json::<BackupTarget>()
628			.await
629			.into_diagnostic()
630			.wrap_err("parsing backup target from canopy")?;
631		Ok(TargetOutcome::Ready(target))
632	}
633
634	/// Report a completed backup/restore run (`POST /backup-report`).
635	pub async fn backup_report(&self, base_url: &Url, report: &BackupReport<'_>) -> Result<()> {
636		let (http, url) = self.endpoint_url(base_url, "/backup-report").await?;
637		debug!(%url, run_id = report.run_id, "reporting backup outcome to canopy");
638		let response = http
639			.post(url)
640			.header("X-Version", &self.tamanu_version)
641			.json(report)
642			.send()
643			.await
644			.into_diagnostic()
645			.wrap_err("posting backup report to canopy")?;
646
647		let status = response.status();
648		if !status.is_success() {
649			let body = response.text().await.unwrap_or_default();
650			return Err(miette::miette!(
651				"canopy /backup-report returned {status}: {body}"
652			));
653		}
654		Ok(())
655	}
656
657	/// Register the restore intents this consumer supports (`POST /restore-capabilities`).
658	///
659	/// Replaces the registered intent set wholesale. Canopy dispatches only
660	/// matching worklist entries.
661	pub async fn restore_capabilities(&self, base_url: &Url, intents: &[&str]) -> Result<()> {
662		let (http, url) = self.endpoint_url(base_url, "/restore-capabilities").await?;
663		debug!(%url, ?intents, "registering restore capabilities with canopy");
664		let response = http
665			.post(url)
666			.header("X-Version", &self.tamanu_version)
667			.json(&RestoreCapabilitiesRequest { intents })
668			.send()
669			.await
670			.into_diagnostic()
671			.wrap_err("posting restore capabilities to canopy")?;
672
673		let status = response.status();
674		if !status.is_success() {
675			let body = response.text().await.unwrap_or_default();
676			return Err(miette::miette!(
677				"canopy /restore-capabilities returned {status}: {body}"
678			));
679		}
680		Ok(())
681	}
682
683	/// Fetch the desired-state worklist of replicas to restore (`GET /restore-worklist`).
684	pub async fn restore_worklist(&self, base_url: &Url) -> Result<Vec<WorklistEntry>> {
685		let (http, url) = self.endpoint_url(base_url, "/restore-worklist").await?;
686		debug!(%url, "fetching restore worklist from canopy");
687		let response = http
688			.get(url)
689			.header("X-Version", &self.tamanu_version)
690			.send()
691			.await
692			.into_diagnostic()
693			.wrap_err("fetching restore worklist from canopy")?;
694
695		let status = response.status();
696		if !status.is_success() {
697			let body = response.text().await.unwrap_or_default();
698			return Err(miette::miette!(
699				"canopy /restore-worklist returned {status}: {body}"
700			));
701		}
702		response
703			.json::<Vec<WorklistEntry>>()
704			.await
705			.into_diagnostic()
706			.wrap_err("parsing restore worklist from canopy")
707	}
708
709	/// Obtain read-only S3 credentials and the repo password for a group
710	/// (`POST /restore-credentials`).
711	///
712	/// Creds are 1-hour chained STS; refresh by re-calling. `403`/`409`/`502`
713	/// surface as errors.
714	pub async fn restore_credentials(
715		&self,
716		base_url: &Url,
717		backup_type: &str,
718		group: Uuid,
719	) -> Result<RestoreCredentials> {
720		let (http, url) = self.endpoint_url(base_url, "/restore-credentials").await?;
721		debug!(%url, backup_type, %group, "requesting restore credentials from canopy");
722		let response = http
723			.post(url)
724			.header("X-Version", &self.tamanu_version)
725			.json(&RestoreCredentialsRequest {
726				group,
727				r#type: backup_type,
728			})
729			.send()
730			.await
731			.into_diagnostic()
732			.wrap_err("posting restore credentials request to canopy")?;
733
734		let status = response.status();
735		if !status.is_success() {
736			let body = response.text().await.unwrap_or_default();
737			return Err(miette::miette!(
738				"canopy /restore-credentials returned {status}: {body}"
739			));
740		}
741		response
742			.json::<RestoreCredentials>()
743			.await
744			.into_diagnostic()
745			.wrap_err("parsing restore credentials from canopy")
746	}
747
748	/// Report a restore's health (`POST /restore-verification`).
749	pub async fn restore_verification(
750		&self,
751		base_url: &Url,
752		report: &RestoreVerification<'_>,
753	) -> Result<()> {
754		let (http, url) = self.endpoint_url(base_url, "/restore-verification").await?;
755		debug!(%url, group = %report.group, "reporting restore verification to canopy");
756		let response = http
757			.post(url)
758			.header("X-Version", &self.tamanu_version)
759			.json(report)
760			.send()
761			.await
762			.into_diagnostic()
763			.wrap_err("posting restore verification to canopy")?;
764
765		let status = response.status();
766		if !status.is_success() {
767			let body = response.text().await.unwrap_or_default();
768			return Err(miette::miette!(
769				"canopy /restore-verification returned {status}: {body}"
770			));
771		}
772		Ok(())
773	}
774}
775
776/// Probe the tailscale canopy endpoint.
777///
778/// Returns a configured `reqwest::Client` if `GET /public/servers` responds
779/// 2xx — anything else (timeout, non-2xx, transport error) returns `None` so
780/// the caller can fall back to mTLS.
781///
782/// Tries two paths in order:
783/// 1. Resolve `canopy` via the tailscale DNS server (100.100.100.100) and
784///    probe with those addresses.
785/// 2. Use hardcoded tailscale IPs for canopy and probe with those.
786///
787/// `/public/servers` is used because:
788/// - it lives under `/public/...`, the only mount that accepts tagged-device
789///   tailscale callers (everything else 403s with `tagged-device-not-allowed`);
790/// - it's a `GET` with no body, no `VersionHeader` requirement, and no auth;
791/// - it's read-only, so probing it has no side effects.
792async fn probe_tailscale(make_builder: &ClientBuilderFactory) -> Option<reqwest::Client> {
793	let dns_addrs: Vec<SocketAddr> = tailscale_resolver()
794		.lookup_ip("canopy")
795		.await
796		.ok()
797		.map(|addrs| addrs.iter().map(|ip| SocketAddr::new(ip, 443)).collect())
798		.unwrap_or_default();
799	if !dns_addrs.is_empty()
800		&& let Some(client) = try_probe(&dns_addrs, make_builder).await
801	{
802		return Some(client);
803	}
804
805	let hardcoded = [
806		SocketAddr::new(IpAddr::V4(CANOPY_HARDCODED_V4), 443),
807		SocketAddr::new(IpAddr::V6(CANOPY_HARDCODED_V6), 443),
808	];
809	debug!(
810		?hardcoded,
811		"canopy tailscale DNS lookup empty or probe failed, trying hardcoded IPs"
812	);
813	try_probe(&hardcoded, make_builder).await
814}
815
816async fn try_probe(
817	addrs: &[SocketAddr],
818	make_builder: &ClientBuilderFactory,
819) -> Option<reqwest::Client> {
820	let client = make_builder()
821		.timeout(TAILSCALE_PROBE_TIMEOUT)
822		.resolve_to_addrs(TAILSCALE_HOST, addrs)
823		.build()
824		.ok()?;
825
826	let url = format!("{TAILSCALE_URL}/public/servers");
827	match client.get(&url).send().await {
828		Ok(resp) if resp.status().is_success() => Some(client),
829		Ok(resp) => {
830			debug!(status = %resp.status(), ?addrs, "canopy tailscale probe: unexpected status");
831			None
832		}
833		Err(err) => {
834			debug!(?addrs, "canopy tailscale probe failed: {err}");
835			None
836		}
837	}
838}
839
840fn tailscale_resolver() -> Resolver<impl ConnectionProvider> {
841	Resolver::builder_with_config(
842		ResolverConfig::from_parts(
843			None,
844			vec!["tail53aef.ts.net.".parse().unwrap()],
845			vec![NameServerConfig::new(
846				"100.100.100.100".parse().unwrap(),
847				true,
848				vec![ConnectionConfig::udp()],
849			)],
850		),
851		TokioRuntimeProvider::default(),
852	)
853	.build()
854	.expect("tailscale resolver config is hardcoded and cannot fail to build")
855}
856
857fn gzip_bytes(bytes: &[u8]) -> std::io::Result<Vec<u8>> {
858	let mut encoder = GzEncoder::new(Vec::with_capacity(bytes.len() / 2), Compression::default());
859	encoder.write_all(bytes)?;
860	encoder.finish()
861}
862
863/// Build a short-lived self-signed client certificate from a P-256 device key
864/// PEM and wrap it as a reqwest mTLS [`Identity`].
865///
866/// Canopy identifies a device by its certificate's public key (SPKI), not by a
867/// CA chain, so a fresh self-signed cert from the device key is all that's
868/// needed. The same device key drives both the long-running canopy client here
869/// and the one-shot `canopy register` enrollment handshake, so they present the
870/// same identity to canopy.
871pub fn device_identity(device_key_pem: &str) -> Result<reqwest::Identity> {
872	let key_pair = KeyPair::from_pem(device_key_pem)
873		.into_diagnostic()
874		.wrap_err("parsing device key PEM")?;
875
876	let mut params = CertificateParams::new(vec!["device.local".into()])
877		.into_diagnostic()
878		.wrap_err("building certificate params")?;
879	params.distinguished_name = DistinguishedName::new();
880	params
881		.distinguished_name
882		.push(DnType::CommonName, "device.local");
883
884	let now = OffsetDateTime::now_utc();
885	params.not_before = now - TimeDuration::minutes(1);
886	params.not_after = now + TimeDuration::days(CERT_VALIDITY_DAYS);
887
888	let cert = params
889		.self_signed(&key_pair)
890		.into_diagnostic()
891		.wrap_err("self-signing certificate")?;
892
893	let mut combined = cert.pem();
894	combined.push('\n');
895	combined.push_str(&key_pair.serialize_pem());
896
897	reqwest::Identity::from_pem(combined.as_bytes())
898		.into_diagnostic()
899		.wrap_err("building reqwest TLS identity")
900}
901
902fn build_mtls_http(
903	make_builder: &ClientBuilderFactory,
904	device_key_pem: &str,
905) -> Result<reqwest::Client> {
906	let identity = device_identity(device_key_pem)?;
907
908	make_builder()
909		.identity(identity)
910		.use_rustls_tls()
911		.timeout(Duration::from_secs(30))
912		.build()
913		.into_diagnostic()
914		.wrap_err("building canopy HTTP client")
915}
916
917#[cfg(test)]
918mod tests {
919	use super::*;
920
921	const TEST_DEVICE_KEY: &str = "\
922-----BEGIN PRIVATE KEY-----
923MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgVvhzsYiidp38GYn1
924KxD5Wipc/h8lglVsy1UFZq/SZbGhRANCAAT2EsEq7xjeWVnim9XwdYXga/LBbppm
925fXLgamTYOa/w9n/Ta64fiYWmN54kEd0DgnflJDLtID321Zz6xswvK/VN
926-----END PRIVATE KEY-----";
927
928	fn test_factory() -> ClientBuilderFactory {
929		Arc::new(reqwest::Client::builder)
930	}
931
932	#[test]
933	fn build_mtls_http_from_p256_key() {
934		// Direct mTLS-path build, bypassing the async constructor / tailscale probe.
935		let result = build_mtls_http(&test_factory(), TEST_DEVICE_KEY);
936		assert!(result.is_ok(), "{:?}", result.err());
937	}
938
939	#[test]
940	fn build_mtls_http_fails_on_garbage_key() {
941		assert!(build_mtls_http(&test_factory(), "not a real PEM").is_err());
942	}
943
944	#[tokio::test]
945	async fn renew_with_mtls_state_swaps_in_fresh_client() {
946		// Construct an mTLS-state client directly (no network probe) and renew it.
947		let http = build_mtls_http(&test_factory(), TEST_DEVICE_KEY).unwrap();
948		let client = CanopyClient {
949			device_key: Some(Redacted(TEST_DEVICE_KEY.to_owned())),
950			tamanu_version: "2.54.2".into(),
951			make_builder: test_factory(),
952			state: RwLock::new(State::Mtls(http)),
953		};
954		client.renew().await.expect("renew should succeed");
955		assert!(!client.is_tailscale().await);
956	}
957
958	#[tokio::test]
959	async fn renew_is_noop_in_tailscale_mode() {
960		// Tailscale-state client with no device key — renew is a no-op.
961		let http = reqwest::Client::new();
962		let client = CanopyClient {
963			device_key: None,
964			tamanu_version: "2.54.2".into(),
965			make_builder: test_factory(),
966			state: RwLock::new(State::Tailscale(http)),
967		};
968		client.renew().await.expect("renew should be a no-op");
969		assert!(client.is_tailscale().await);
970	}
971
972	fn mtls_client_against(base: &str) -> (CanopyClient, Url) {
973		let http = build_mtls_http(&test_factory(), TEST_DEVICE_KEY).unwrap();
974		let client = CanopyClient {
975			device_key: Some(Redacted(TEST_DEVICE_KEY.to_owned())),
976			tamanu_version: "2.54.2".into(),
977			make_builder: test_factory(),
978			state: RwLock::new(State::Mtls(http)),
979		};
980		(client, base.parse().unwrap())
981	}
982
983	struct Captured {
984		request_line: String,
985		headers: String,
986		body: Vec<u8>,
987	}
988
989	/// Bind a loopback socket and answer exactly one HTTP request with
990	/// `response`, capturing the received request line, headers, and body.
991	fn serve_once(response: &'static str) -> (String, std::thread::JoinHandle<Captured>) {
992		use std::io::{Read, Write};
993		use std::net::TcpListener;
994
995		let listener = TcpListener::bind("127.0.0.1:0").unwrap();
996		let base = format!("http://{}", listener.local_addr().unwrap());
997		let handle = std::thread::spawn(move || {
998			let (mut stream, _) = listener.accept().unwrap();
999			let mut buf = Vec::new();
1000			let mut chunk = [0u8; 1024];
1001			let header_end = loop {
1002				if let Some(pos) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1003					break pos + 4;
1004				}
1005				let n = stream.read(&mut chunk).unwrap();
1006				if n == 0 {
1007					panic!("connection closed before headers were complete");
1008				}
1009				buf.extend_from_slice(&chunk[..n]);
1010			};
1011
1012			let head = String::from_utf8_lossy(&buf[..header_end]).into_owned();
1013			let content_length = head
1014				.lines()
1015				.find_map(|line| {
1016					let (name, value) = line.split_once(':')?;
1017					name.trim()
1018						.eq_ignore_ascii_case("content-length")
1019						.then(|| value.trim().parse::<usize>().ok())
1020						.flatten()
1021				})
1022				.unwrap_or(0);
1023
1024			let mut body = buf[header_end..].to_vec();
1025			while body.len() < content_length {
1026				let n = stream.read(&mut chunk).unwrap();
1027				if n == 0 {
1028					break;
1029				}
1030				body.extend_from_slice(&chunk[..n]);
1031			}
1032
1033			stream.write_all(response.as_bytes()).unwrap();
1034			stream.flush().unwrap();
1035
1036			let mut lines = head.lines();
1037			let request_line = lines.next().unwrap_or_default().to_owned();
1038			let headers = lines.collect::<Vec<_>>().join("\n");
1039			Captured {
1040				request_line,
1041				headers,
1042				body,
1043			}
1044		});
1045		(base, handle)
1046	}
1047
1048	#[derive(Debug, Deserialize, PartialEq)]
1049	struct Echo {
1050		ok: bool,
1051		who: String,
1052	}
1053
1054	#[tokio::test]
1055	async fn request_json_sends_version_and_body_and_parses_response() {
1056		let (base, handle) = serve_once(
1057			"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 26\r\n\r\n{\"ok\":true,\"who\":\"device\"}",
1058		);
1059		let (client, base_url) = mtls_client_against(&base);
1060
1061		let payload = serde_json::json!({ "hello": "world" });
1062		let got: Echo = client
1063			.request_json(reqwest::Method::POST, &base_url, "/thing", Some(&payload))
1064			.await
1065			.expect("request_json should succeed");
1066
1067		assert_eq!(
1068			got,
1069			Echo {
1070				ok: true,
1071				who: "device".into()
1072			}
1073		);
1074
1075		let captured = handle.join().unwrap();
1076		assert!(
1077			captured.request_line.starts_with("POST /thing "),
1078			"unexpected request line: {}",
1079			captured.request_line
1080		);
1081		assert!(
1082			captured
1083				.headers
1084				.to_ascii_lowercase()
1085				.contains("x-version: 2.54.2"),
1086			"missing X-Version header in:\n{}",
1087			captured.headers
1088		);
1089		let sent: serde_json::Value = serde_json::from_slice(&captured.body).unwrap();
1090		assert_eq!(sent, payload);
1091	}
1092
1093	#[tokio::test]
1094	async fn request_json_errors_on_non_success_with_body() {
1095		let (base, handle) =
1096			serve_once("HTTP/1.1 418 I'm a teapot\r\nContent-Length: 14\r\n\r\nno coffee here");
1097		let (client, base_url) = mtls_client_against(&base);
1098
1099		let err = client
1100			.request_json::<serde_json::Value>(
1101				reqwest::Method::GET,
1102				&base_url,
1103				"/brew",
1104				None::<&()>,
1105			)
1106			.await
1107			.expect_err("non-2xx should error");
1108		let msg = err.to_string();
1109		assert!(msg.contains("/brew"), "expected path in error: {msg}");
1110		assert!(msg.contains("418"), "expected status in error: {msg}");
1111		assert!(
1112			msg.contains("no coffee here"),
1113			"expected body text in error: {msg}"
1114		);
1115
1116		handle.join().unwrap();
1117	}
1118
1119	#[test]
1120	fn user_agent_has_product_and_os_comment() {
1121		let ua = user_agent("bestool", "1.2.3");
1122		assert!(
1123			ua.starts_with("bestool/1.2.3 "),
1124			"unexpected user-agent: {ua}"
1125		);
1126		assert!(ua.contains('('), "expected OS comment in: {ua}");
1127		assert!(ua.ends_with(')'), "expected OS comment in: {ua}");
1128		assert!(
1129			ua.contains(sysinfo::System::cpu_arch().as_str()),
1130			"expected arch in: {ua}"
1131		);
1132	}
1133
1134	#[test]
1135	fn gzip_bytes_roundtrips() {
1136		use flate2::read::GzDecoder;
1137		use std::io::Read;
1138
1139		let original = br#"{"health":[{"check":"x","result":"passed"}]}"#;
1140		let compressed = gzip_bytes(original).expect("gzip should succeed");
1141		assert!(
1142			compressed.starts_with(&[0x1f, 0x8b]),
1143			"expected gzip magic bytes"
1144		);
1145		let mut decoder = GzDecoder::new(&compressed[..]);
1146		let mut decompressed = Vec::new();
1147		decoder.read_to_end(&mut decompressed).unwrap();
1148		assert_eq!(decompressed, original);
1149	}
1150
1151	#[test]
1152	fn severity_serialises_lowercase() {
1153		assert_eq!(
1154			serde_json::to_string(&Severity::Warning).unwrap(),
1155			"\"warning\""
1156		);
1157		assert_eq!(
1158			serde_json::to_string(&Severity::Critical).unwrap(),
1159			"\"critical\""
1160		);
1161	}
1162
1163	#[test]
1164	fn new_event_omits_optional_fields() {
1165		let evt = NewEvent {
1166			source: "src",
1167			r#ref: "host/alert:tgt",
1168			message: "msg",
1169			description: None,
1170			severity: None,
1171			occurred_at: None,
1172			active: None,
1173		};
1174		let json = serde_json::to_string(&evt).unwrap();
1175		assert!(json.contains("\"source\":\"src\""));
1176		assert!(json.contains("\"ref\":\"host/alert:tgt\""));
1177		assert!(json.contains("\"message\":\"msg\""));
1178		assert!(!json.contains("description"));
1179		assert!(!json.contains("severity"));
1180		assert!(!json.contains("occurredAt"));
1181		assert!(!json.contains("active"));
1182	}
1183
1184	#[test]
1185	fn new_event_serialises_occurred_at_as_camel_case() {
1186		let evt = NewEvent {
1187			source: "src",
1188			r#ref: "ref",
1189			message: "msg",
1190			description: Some("desc"),
1191			severity: Some(Severity::Warning),
1192			occurred_at: Some("2025-01-01T00:00:00Z".parse().unwrap()),
1193			active: Some(true),
1194		};
1195		let json = serde_json::to_string(&evt).unwrap();
1196		assert!(json.contains("\"occurredAt\":"));
1197		assert!(json.contains("\"description\":\"desc\""));
1198		assert!(json.contains("\"severity\":\"warning\""));
1199		assert!(json.contains("\"active\":true"));
1200	}
1201}