mkit_core/protocol.rs
1//! Cross-transport types: error taxonomy, the [`Transport`] trait, the
2//! [`PackKey`] digest wrapper, and the retry/backoff helpers used by
3//! every transport implementation (memory, file, HTTP, S3, SSH).
4//!
5//! The SSH wire format is defined in `mkit-rpc`'s `ssh.proto` and
6//! lives in `mkit_rpc::mkit::rpc::v1::ssh`; transport-ssh consumes
7//! the schema directly. The hand-rolled `OP_HELLO` byte format that
8//! used to live in this module has been retired.
9
10// SPEC-TRANSPORT §7 calls out the exponential ladder in seconds
11// (1, 2, 4, …, 300). Expressing those values with `Duration::from_secs`
12// is deliberate — switching to `from_mins` loses the one-to-one match
13// with the spec text.
14#![allow(clippy::duration_suboptimal_units)]
15
16use core::fmt;
17use core::time::Duration;
18
19use crate::hash::{FromHexError, Hash, to_hex};
20use crate::refs::Ref;
21pub use crate::refs::RefWriteCondition;
22
23// ---------------------------------------------------------------------------
24// Error taxonomy
25// ---------------------------------------------------------------------------
26
27/// Errors that any transport may surface across the [`Transport`]
28/// boundary. Implementations MAY wrap transport-specific errors
29/// internally but MUST map them to one of these variants before
30/// returning.
31#[derive(Debug, thiserror::Error)]
32pub enum TransportError {
33 /// `download_pack` called on a digest the remote does not hold.
34 #[error("pack not found on remote")]
35 PackNotFound,
36 /// Authentication or ACL failure (HTTP 401/403, SSH auth refusal,
37 /// S3 `SignatureDoesNotMatch`, …).
38 #[error("access denied by remote")]
39 AccessDenied,
40 /// Catch-all remote-side failure carrying an advisory message. The
41 /// message is for operators; programs MUST NOT pattern-match on its
42 /// contents.
43 #[error("remote error: {0}")]
44 RemoteError(String),
45 /// `update_ref` CAS precondition was not satisfied. Per
46 /// SPEC-TRANSPORT §7, callers MUST treat this as
47 /// "possibly-success on retry" for `.missing` / `.match` and
48 /// confirm with `read_ref`.
49 #[error("ref CAS precondition failed")]
50 RefConflict,
51 /// Caller passed a ref name failing SPEC-REFS §3.
52 #[error("invalid ref name: {0}")]
53 InvalidRef(String),
54 /// Network-level failure: DNS, TCP connect, TLS handshake, SSH
55 /// subprocess spawn. Retryable (see [`is_retryable`]).
56 #[error("connection to remote failed")]
57 ConnectionFailed,
58 /// Unexpected HTTP status or transport-protocol error. 5xx and 429
59 /// are retryable; 4xx (except 401/403/404/409/412) is not.
60 #[error("server error (status {status})")]
61 ServerError {
62 /// Numeric status code. HTTP uses its native codes; transports
63 /// without a status integer use `0`.
64 status: u16,
65 },
66 /// Server response did not match the wire contract (truncated
67 /// frame, unknown opcode, bad JSON, …).
68 #[error("invalid response from remote")]
69 InvalidResponse,
70 /// Generic protocol-level failure — malformed frame, unexpected
71 /// opcode order, or failed handshake.
72 #[error("protocol error")]
73 ProtocolError,
74 /// Payload exceeded a transport-specific cap.
75 #[error("payload too large: {0} bytes")]
76 PayloadTooLarge(usize),
77 /// An insecure URL scheme (plain `http://`) was supplied for a
78 /// non-loopback host. Plain HTTP is restricted to loopback addresses
79 /// (`127.0.0.1`, `::1`, `localhost`) so production traffic is never
80 /// transported in the clear.
81 #[error("insecure scheme: plain http:// is allowed only for loopback hosts")]
82 InsecureScheme,
83}
84
85/// Result alias used throughout this module.
86pub type TransportResult<T> = Result<T, TransportError>;
87
88// ---------------------------------------------------------------------------
89// PackKey — 32-byte digest wrapper
90// ---------------------------------------------------------------------------
91
92/// A 32-byte pack digest used as the content-address for an uploaded
93/// pack. This is the same 32 bytes as [`Hash`](tyalias@Hash) but wrapped so pack
94/// digests and object hashes do not silently cross purposes at API
95/// boundaries.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
97pub struct PackKey(pub [u8; 32]);
98
99impl PackKey {
100 /// Build a [`PackKey`] from a raw 32-byte digest.
101 #[must_use]
102 pub const fn new(bytes: [u8; 32]) -> Self {
103 Self(bytes)
104 }
105
106 /// Borrow the underlying 32 bytes.
107 #[must_use]
108 pub const fn as_bytes(&self) -> &[u8; 32] {
109 &self.0
110 }
111
112 /// Lowercase 64-char hex.
113 #[must_use]
114 pub fn to_hex(&self) -> String {
115 to_hex(&self.0)
116 }
117
118 /// Build a [`PackKey`] from a [`Hash`](tyalias@Hash) (alias for [`From`]).
119 #[must_use]
120 pub const fn from_hash(h: Hash) -> Self {
121 Self(h)
122 }
123
124 /// Convert back to a plain [`Hash`](tyalias@Hash).
125 #[must_use]
126 pub const fn into_hash(self) -> Hash {
127 self.0
128 }
129}
130
131impl From<Hash> for PackKey {
132 fn from(h: Hash) -> Self {
133 Self(h)
134 }
135}
136
137impl From<PackKey> for Hash {
138 fn from(k: PackKey) -> Hash {
139 k.0
140 }
141}
142
143impl fmt::Display for PackKey {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 f.write_str(&self.to_hex())
146 }
147}
148
149/// Parse a [`PackKey`] from a 64-char lowercase hex string.
150///
151/// Accepts uppercase too (matches the permissive [`crate::hash::from_hex`]
152/// semantics); callers that require lowercase MUST validate the input
153/// independently.
154pub fn pack_key_from_hex(s: &str) -> Result<PackKey, FromHexError> {
155 let h = crate::hash::from_hex(s)?;
156 Ok(PackKey(h))
157}
158
159// ---------------------------------------------------------------------------
160// Retry / backoff
161// ---------------------------------------------------------------------------
162
163/// Return `true` if a transport should retry after seeing `err`.
164///
165/// Retryable per SPEC-TRANSPORT §7:
166/// - [`TransportError::ConnectionFailed`]
167/// - [`TransportError::ServerError`] with a 5xx status OR HTTP 429.
168///
169/// Explicitly non-retryable:
170/// - [`TransportError::PackNotFound`]
171/// - [`TransportError::AccessDenied`]
172/// - [`TransportError::RefConflict`] (CAS retry is a caller-level policy)
173/// - [`TransportError::InvalidRef`]
174/// - [`TransportError::InvalidResponse`] / [`TransportError::ProtocolError`]
175/// - [`TransportError::PayloadTooLarge`]
176/// - [`TransportError::RemoteError`] — the remote chose not to be specific;
177/// we do not guess.
178/// - [`TransportError::ServerError`] with any 4xx status.
179#[must_use]
180pub fn is_retryable(err: &TransportError) -> bool {
181 match err {
182 TransportError::ConnectionFailed => true,
183 TransportError::ServerError { status } => *status >= 500 || *status == 429,
184 _ => false,
185 }
186}
187
188/// Max attempts for the default backoff ladder.
189///
190/// SPEC-TRANSPORT §7: `attempt = 1; while attempt ≤ 5`.
191pub const BACKOFF_MAX_ATTEMPTS: u32 = 5;
192
193/// Initial sleep between attempts.
194pub const BACKOFF_INITIAL: Duration = Duration::from_secs(1);
195
196/// Upper bound on any individual sleep.
197pub const BACKOFF_CAP: Duration = Duration::from_secs(300);
198
199/// Per-pack body size ceiling enforced by every transport that ingests
200/// pack bytes (HTTP `Content-Length`, S3 `GetObject`, SSH
201/// `DownloadPackHeader.total_bytes`). On 64-bit targets, 4 GiB matches
202/// the pack-format addressable range; pointer-width-limited targets cap
203/// at their maximum addressable buffer size instead of failing to compile.
204#[cfg(target_pointer_width = "64")]
205pub const PACK_BODY_LIMIT: u64 = 4 * 1024 * 1024 * 1024;
206#[cfg(not(target_pointer_width = "64"))]
207pub const PACK_BODY_LIMIT: u64 = usize::MAX as u64;
208
209/// `usize`-typed mirror of [`PACK_BODY_LIMIT`] for `Vec`-shaped buffer
210/// caps. The assertion below prevents silent truncation on any target.
211#[allow(clippy::cast_possible_truncation)]
212pub const PACK_BODY_LIMIT_USIZE: usize = PACK_BODY_LIMIT as usize;
213const _: () = assert!(
214 (PACK_BODY_LIMIT_USIZE as u64) == PACK_BODY_LIMIT,
215 "PACK_BODY_LIMIT does not fit in usize on this target",
216);
217
218/// Exponential-backoff iterator used by all transports.
219///
220/// Yields `[1s, 2s, 4s, 8s, 16s]` (5 attempts) for the default ladder,
221/// doubling each step and capping at 300s. This is the ladder mandated
222/// by SPEC-TRANSPORT §7 for `ConnectionFailed`, 5xx, and HTTP 429.
223///
224/// The iterator is self-contained — it holds no reference to a clock,
225/// so it can be constructed in tests and exhaustively enumerated.
226#[derive(Debug, Clone)]
227pub struct BackoffIterator {
228 next_delay: Duration,
229 attempts_remaining: u32,
230 cap: Duration,
231}
232
233impl BackoffIterator {
234 /// Default ladder: 5 attempts, starting at 1s, doubling, capped at 300s.
235 #[must_use]
236 pub const fn new() -> Self {
237 Self {
238 next_delay: BACKOFF_INITIAL,
239 attempts_remaining: BACKOFF_MAX_ATTEMPTS,
240 cap: BACKOFF_CAP,
241 }
242 }
243
244 /// Custom ladder for tests.
245 #[must_use]
246 pub const fn with(initial: Duration, cap: Duration, attempts: u32) -> Self {
247 Self {
248 next_delay: initial,
249 attempts_remaining: attempts,
250 cap,
251 }
252 }
253}
254
255impl Default for BackoffIterator {
256 fn default() -> Self {
257 Self::new()
258 }
259}
260
261impl Iterator for BackoffIterator {
262 type Item = Duration;
263
264 fn next(&mut self) -> Option<Self::Item> {
265 if self.attempts_remaining == 0 {
266 return None;
267 }
268 self.attempts_remaining -= 1;
269 let current = self.next_delay;
270 let doubled = current.saturating_mul(2);
271 self.next_delay = if doubled > self.cap {
272 self.cap
273 } else {
274 doubled
275 };
276 Some(current)
277 }
278}
279
280// ---------------------------------------------------------------------------
281// Transport trait
282// ---------------------------------------------------------------------------
283
284/// The mkit transport vtable.
285///
286/// Every transport (memory, file, HTTP, S3, SSH) implements this trait.
287/// Methods are synchronous and take `&self`; transports that need
288/// interior mutability (e.g. connection pools) MUST use a `Mutex` /
289/// `RwLock` internally. This keeps the trait object-safe.
290///
291/// All implementations MUST honour the retry policy in
292/// SPEC-TRANSPORT §7 internally OR document that the caller is
293/// responsible — the abstract trait takes no position. The
294/// [`is_retryable`] and [`BackoffIterator`] helpers are provided for
295/// implementations that embed the policy.
296pub trait Transport: Send + Sync {
297 /// Upload a pack. The digest is computed by the caller (BLAKE3 of
298 /// the full pack bytes) and used as the object key — servers MAY
299 /// dedupe on this key.
300 fn upload_pack(&self, bytes: &[u8], key: &PackKey) -> TransportResult<()>;
301
302 /// Download a pack by its digest.
303 ///
304 /// Returns [`TransportError::PackNotFound`] if the remote does not
305 /// hold this digest.
306 fn download_pack(&self, key: &PackKey) -> TransportResult<Vec<u8>>;
307
308 /// HEAD-check a pack. Cheaper than [`Self::download_pack`] on
309 /// network transports.
310 fn pack_exists(&self, key: &PackKey) -> TransportResult<bool>;
311
312 /// Unconditional ref write — equivalent to
313 /// `update_ref(name, RefWriteCondition::Any, hash)`.
314 ///
315 /// Default impl delegates to [`Self::update_ref`] so transports only
316 /// implement one entry point.
317 fn write_ref(&self, name: &str, hash: &Hash) -> TransportResult<()> {
318 self.update_ref(name, RefWriteCondition::Any, hash)
319 }
320
321 /// CAS ref write. See [`RefWriteCondition`].
322 ///
323 /// On `.missing` / `.match` CAS failure, returns
324 /// [`TransportError::RefConflict`]. Callers retrying after a
325 /// timeout MUST follow up with [`Self::read_ref`] to confirm
326 /// whether the first attempt actually landed (SPEC-TRANSPORT §7).
327 fn update_ref(
328 &self,
329 name: &str,
330 condition: RefWriteCondition,
331 hash: &Hash,
332 ) -> TransportResult<()>;
333
334 /// Read the current value of a ref, or `None` if it does not exist.
335 fn read_ref(&self, name: &str) -> TransportResult<Option<Hash>>;
336
337 /// List refs whose full name starts with `prefix`. Returned names
338 /// have `prefix` stripped per SPEC-REFS §4. An empty prefix lists
339 /// every ref.
340 fn list_refs(&self, prefix: &str) -> TransportResult<Vec<Ref>>;
341}
342
343// ---------------------------------------------------------------------------
344// async_shim — sync/async bridge for transports that wrap an async cipher
345// ---------------------------------------------------------------------------
346
347/// Sync-over-async shim for transports whose underlying cipher / I/O is
348/// async (e.g. `commonware-stream::encrypted`) but whose
349/// [`Transport`] trait surface is intentionally sync.
350///
351/// Lives in `mkit-core` (the trait crate) because it is generic
352/// infrastructure — multiple transports and Phase 2 sparse-checkout
353/// will reuse the same plug-in point. It does **not** depend on
354/// `tokio`, `commonware-runtime`, or any concrete executor; callers
355/// pick the runner.
356///
357/// # Why a trait
358///
359/// `mkit-transport-enc` and (in Phase 2) `mkit-core::sparse` need to
360/// drive `async fn` bodies from a sync method. Hard-coding
361/// `tokio::runtime::Handle::block_on` would bleed tokio across the
362/// workspace; hard-coding `commonware_runtime::deterministic` would
363/// mean production = tests. A pluggable `Executor` keeps the
364/// runtime-choice at the consumer crate.
365pub mod async_shim {
366 /// Drives an async future to completion synchronously. Pluggable so
367 /// callers can choose between `tokio`, `commonware-runtime`'s
368 /// deterministic runner (tests), or the planned production tokio
369 /// runner without `mkit-core` having to compile-time depend on a
370 /// specific runtime crate.
371 ///
372 /// Implementations MUST be re-entrancy-safe in the sense expected
373 /// by the chosen runtime — calling `block_on` from inside an
374 /// already-running task on the same runtime will typically panic
375 /// or deadlock. The shim's contract is "synchronous external API
376 /// wraps async internals", not "arbitrary async-from-sync
377 /// recursion".
378 pub trait Executor: Send + Sync {
379 /// Block the current thread until `fut` resolves.
380 fn block_on<F, T>(&self, fut: F) -> T
381 where
382 F: core::future::Future<Output = T> + Send,
383 T: Send;
384 }
385}
386
387// ---------------------------------------------------------------------------
388// Tests
389// ---------------------------------------------------------------------------
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[test]
396 fn pack_key_hex_roundtrip() {
397 let bytes = [0x42u8; 32];
398 let pk = PackKey::new(bytes);
399 let hex = pk.to_hex();
400 assert_eq!(hex.len(), 64);
401 let pk2 = pack_key_from_hex(&hex).unwrap();
402 assert_eq!(pk, pk2);
403 }
404
405 #[test]
406 fn is_retryable_matches_spec() {
407 assert!(is_retryable(&TransportError::ConnectionFailed));
408 assert!(is_retryable(&TransportError::ServerError { status: 500 }));
409 assert!(is_retryable(&TransportError::ServerError { status: 503 }));
410 assert!(is_retryable(&TransportError::ServerError { status: 429 }));
411 assert!(!is_retryable(&TransportError::ServerError { status: 404 }));
412 assert!(!is_retryable(&TransportError::ServerError { status: 401 }));
413 assert!(!is_retryable(&TransportError::PackNotFound));
414 assert!(!is_retryable(&TransportError::AccessDenied));
415 assert!(!is_retryable(&TransportError::RefConflict));
416 }
417
418 #[test]
419 fn backoff_default_ladder_is_1_2_4_8_16() {
420 let delays: Vec<Duration> = BackoffIterator::new().collect();
421 assert_eq!(
422 delays,
423 vec![
424 Duration::from_secs(1),
425 Duration::from_secs(2),
426 Duration::from_secs(4),
427 Duration::from_secs(8),
428 Duration::from_secs(16),
429 ]
430 );
431 }
432
433 #[test]
434 fn backoff_caps_at_max() {
435 let cap = Duration::from_secs(10);
436 let delays: Vec<Duration> = BackoffIterator::with(Duration::from_secs(8), cap, 5).collect();
437 // 8s, then cap (16s would exceed 10s cap; clamped to 10s)
438 assert_eq!(delays[0], Duration::from_secs(8));
439 for d in &delays[1..] {
440 assert!(*d <= cap);
441 }
442 }
443}