Skip to main content

mkit_core/
protocol.rs

1//! Cross-transport types: error taxonomy, the [`Transport`] trait, the
2//! [`PackKey`] digest wrapper, and the retry/backoff helpers used by
3//! every transport implementation (memory, file, HTTP, S3, SSH).
4//!
5//! The SSH wire format is defined in `mkit-rpc`'s `ssh.proto` and
6//! lives in `mkit_rpc::mkit::rpc::v1::ssh`; transport-ssh consumes
7//! the schema directly. The hand-rolled `OP_HELLO` byte format that
8//! used to live in this module has been retired.
9
10// SPEC-TRANSPORT §7 calls out the exponential ladder in seconds
11// (1, 2, 4, …, 300). Expressing those values with `Duration::from_secs`
12// is deliberate — switching to `from_mins` loses the one-to-one match
13// with the spec text.
14#![allow(clippy::duration_suboptimal_units)]
15
16use core::fmt;
17use core::time::Duration;
18
19use crate::hash::{FromHexError, Hash, to_hex};
20use crate::refs::Ref;
21pub use crate::refs::RefWriteCondition;
22
23// ---------------------------------------------------------------------------
24// Error taxonomy
25// ---------------------------------------------------------------------------
26
27/// Errors that any transport may surface across the [`Transport`]
28/// boundary. Implementations MAY wrap transport-specific errors
29/// internally but MUST map them to one of these variants before
30/// returning.
31#[derive(Debug, thiserror::Error)]
32pub enum TransportError {
33    /// `download_pack` called on a digest the remote does not hold.
34    #[error("pack not found on remote")]
35    PackNotFound,
36    /// Authentication or ACL failure (HTTP 401/403, SSH auth refusal,
37    /// S3 `SignatureDoesNotMatch`, …).
38    #[error("access denied by remote")]
39    AccessDenied,
40    /// Catch-all remote-side failure carrying an advisory message. The
41    /// message is for operators; programs MUST NOT pattern-match on its
42    /// contents.
43    #[error("remote error: {0}")]
44    RemoteError(String),
45    /// `update_ref` CAS precondition was not satisfied. Per
46    /// SPEC-TRANSPORT §7, callers MUST treat this as
47    /// "possibly-success on retry" for `.missing` / `.match` and
48    /// confirm with `read_ref`.
49    #[error("ref CAS precondition failed")]
50    RefConflict,
51    /// Caller passed a ref name failing SPEC-REFS §3.
52    #[error("invalid ref name: {0}")]
53    InvalidRef(String),
54    /// Network-level failure: DNS, TCP connect, TLS handshake, SSH
55    /// subprocess spawn. Retryable (see [`is_retryable`]).
56    #[error("connection to remote failed")]
57    ConnectionFailed,
58    /// Unexpected HTTP status or transport-protocol error. 5xx and 429
59    /// are retryable; 4xx (except 401/403/404/409/412) is not.
60    #[error("server error (status {status})")]
61    ServerError {
62        /// Numeric status code. HTTP uses its native codes; transports
63        /// without a status integer use `0`.
64        status: u16,
65    },
66    /// Server response did not match the wire contract (truncated
67    /// frame, unknown opcode, bad JSON, …).
68    #[error("invalid response from remote")]
69    InvalidResponse,
70    /// Generic protocol-level failure — malformed frame, unexpected
71    /// opcode order, or failed handshake.
72    #[error("protocol error")]
73    ProtocolError,
74    /// Payload exceeded a transport-specific cap.
75    #[error("payload too large: {0} bytes")]
76    PayloadTooLarge(usize),
77    /// An insecure URL scheme (plain `http://`) was supplied for a
78    /// non-loopback host. Plain HTTP is restricted to loopback addresses
79    /// (`127.0.0.1`, `::1`, `localhost`) so production traffic is never
80    /// transported in the clear.
81    #[error("insecure scheme: plain http:// is allowed only for loopback hosts")]
82    InsecureScheme,
83}
84
85/// Result alias used throughout this module.
86pub type TransportResult<T> = Result<T, TransportError>;
87
88// ---------------------------------------------------------------------------
89// PackKey — 32-byte digest wrapper
90// ---------------------------------------------------------------------------
91
92/// A 32-byte pack digest used as the content-address for an uploaded
93/// pack. This is the same 32 bytes as [`Hash`](tyalias@Hash) but wrapped so pack
94/// digests and object hashes do not silently cross purposes at API
95/// boundaries.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
97pub struct PackKey(pub [u8; 32]);
98
99impl PackKey {
100    /// Build a [`PackKey`] from a raw 32-byte digest.
101    #[must_use]
102    pub const fn new(bytes: [u8; 32]) -> Self {
103        Self(bytes)
104    }
105
106    /// Borrow the underlying 32 bytes.
107    #[must_use]
108    pub const fn as_bytes(&self) -> &[u8; 32] {
109        &self.0
110    }
111
112    /// Lowercase 64-char hex.
113    #[must_use]
114    pub fn to_hex(&self) -> String {
115        to_hex(&self.0)
116    }
117
118    /// Build a [`PackKey`] from a [`Hash`](tyalias@Hash) (alias for [`From`]).
119    #[must_use]
120    pub const fn from_hash(h: Hash) -> Self {
121        Self(h)
122    }
123
124    /// Convert back to a plain [`Hash`](tyalias@Hash).
125    #[must_use]
126    pub const fn into_hash(self) -> Hash {
127        self.0
128    }
129}
130
131impl From<Hash> for PackKey {
132    fn from(h: Hash) -> Self {
133        Self(h)
134    }
135}
136
137impl From<PackKey> for Hash {
138    fn from(k: PackKey) -> Hash {
139        k.0
140    }
141}
142
143impl fmt::Display for PackKey {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        f.write_str(&self.to_hex())
146    }
147}
148
149/// Parse a [`PackKey`] from a 64-char lowercase hex string.
150///
151/// Accepts uppercase too (matches the permissive [`crate::hash::from_hex`]
152/// semantics); callers that require lowercase MUST validate the input
153/// independently.
154pub fn pack_key_from_hex(s: &str) -> Result<PackKey, FromHexError> {
155    let h = crate::hash::from_hex(s)?;
156    Ok(PackKey(h))
157}
158
159// ---------------------------------------------------------------------------
160// Retry / backoff
161// ---------------------------------------------------------------------------
162
163/// Return `true` if a transport should retry after seeing `err`.
164///
165/// Retryable per SPEC-TRANSPORT §7:
166/// - [`TransportError::ConnectionFailed`]
167/// - [`TransportError::ServerError`] with a 5xx status OR HTTP 429.
168///
169/// Explicitly non-retryable:
170/// - [`TransportError::PackNotFound`]
171/// - [`TransportError::AccessDenied`]
172/// - [`TransportError::RefConflict`] (CAS retry is a caller-level policy)
173/// - [`TransportError::InvalidRef`]
174/// - [`TransportError::InvalidResponse`] / [`TransportError::ProtocolError`]
175/// - [`TransportError::PayloadTooLarge`]
176/// - [`TransportError::RemoteError`] — the remote chose not to be specific;
177///   we do not guess.
178/// - [`TransportError::ServerError`] with any 4xx status.
179#[must_use]
180pub fn is_retryable(err: &TransportError) -> bool {
181    match err {
182        TransportError::ConnectionFailed => true,
183        TransportError::ServerError { status } => *status >= 500 || *status == 429,
184        _ => false,
185    }
186}
187
188/// Max attempts for the default backoff ladder.
189///
190/// SPEC-TRANSPORT §7: `attempt = 1; while attempt ≤ 5`.
191pub const BACKOFF_MAX_ATTEMPTS: u32 = 5;
192
193/// Initial sleep between attempts.
194pub const BACKOFF_INITIAL: Duration = Duration::from_secs(1);
195
196/// Upper bound on any individual sleep.
197pub const BACKOFF_CAP: Duration = Duration::from_secs(300);
198
199/// Per-pack body size ceiling enforced by every transport that ingests
200/// pack bytes (HTTP `Content-Length`, S3 `GetObject`, SSH
201/// `DownloadPackHeader.total_bytes`). On 64-bit targets, 4 GiB matches
202/// the pack-format addressable range; pointer-width-limited targets cap
203/// at their maximum addressable buffer size instead of failing to compile.
204#[cfg(target_pointer_width = "64")]
205pub const PACK_BODY_LIMIT: u64 = 4 * 1024 * 1024 * 1024;
206#[cfg(not(target_pointer_width = "64"))]
207pub const PACK_BODY_LIMIT: u64 = usize::MAX as u64;
208
209/// `usize`-typed mirror of [`PACK_BODY_LIMIT`] for `Vec`-shaped buffer
210/// caps. The assertion below prevents silent truncation on any target.
211#[allow(clippy::cast_possible_truncation)]
212pub const PACK_BODY_LIMIT_USIZE: usize = PACK_BODY_LIMIT as usize;
213const _: () = assert!(
214    (PACK_BODY_LIMIT_USIZE as u64) == PACK_BODY_LIMIT,
215    "PACK_BODY_LIMIT does not fit in usize on this target",
216);
217
218/// Exponential-backoff iterator used by all transports.
219///
220/// Yields `[1s, 2s, 4s, 8s, 16s]` (5 attempts) for the default ladder,
221/// doubling each step and capping at 300s. This is the ladder mandated
222/// by SPEC-TRANSPORT §7 for `ConnectionFailed`, 5xx, and HTTP 429.
223///
224/// The iterator is self-contained — it holds no reference to a clock,
225/// so it can be constructed in tests and exhaustively enumerated.
226#[derive(Debug, Clone)]
227pub struct BackoffIterator {
228    next_delay: Duration,
229    attempts_remaining: u32,
230    cap: Duration,
231}
232
233impl BackoffIterator {
234    /// Default ladder: 5 attempts, starting at 1s, doubling, capped at 300s.
235    #[must_use]
236    pub const fn new() -> Self {
237        Self {
238            next_delay: BACKOFF_INITIAL,
239            attempts_remaining: BACKOFF_MAX_ATTEMPTS,
240            cap: BACKOFF_CAP,
241        }
242    }
243
244    /// Custom ladder for tests.
245    #[must_use]
246    pub const fn with(initial: Duration, cap: Duration, attempts: u32) -> Self {
247        Self {
248            next_delay: initial,
249            attempts_remaining: attempts,
250            cap,
251        }
252    }
253}
254
255impl Default for BackoffIterator {
256    fn default() -> Self {
257        Self::new()
258    }
259}
260
261impl Iterator for BackoffIterator {
262    type Item = Duration;
263
264    fn next(&mut self) -> Option<Self::Item> {
265        if self.attempts_remaining == 0 {
266            return None;
267        }
268        self.attempts_remaining -= 1;
269        let current = self.next_delay;
270        let doubled = current.saturating_mul(2);
271        self.next_delay = if doubled > self.cap {
272            self.cap
273        } else {
274            doubled
275        };
276        Some(current)
277    }
278}
279
280// ---------------------------------------------------------------------------
281// Transport trait
282// ---------------------------------------------------------------------------
283
284/// The mkit transport vtable.
285///
286/// Every transport (memory, file, HTTP, S3, SSH) implements this trait.
287/// Methods are synchronous and take `&self`; transports that need
288/// interior mutability (e.g. connection pools) MUST use a `Mutex` /
289/// `RwLock` internally. This keeps the trait object-safe.
290///
291/// All implementations MUST honour the retry policy in
292/// SPEC-TRANSPORT §7 internally OR document that the caller is
293/// responsible — the abstract trait takes no position. The
294/// [`is_retryable`] and [`BackoffIterator`] helpers are provided for
295/// implementations that embed the policy.
296pub trait Transport: Send + Sync {
297    /// Upload a pack. The digest is computed by the caller (BLAKE3 of
298    /// the full pack bytes) and used as the object key — servers MAY
299    /// dedupe on this key.
300    fn upload_pack(&self, bytes: &[u8], key: &PackKey) -> TransportResult<()>;
301
302    /// Download a pack by its digest.
303    ///
304    /// Returns [`TransportError::PackNotFound`] if the remote does not
305    /// hold this digest.
306    fn download_pack(&self, key: &PackKey) -> TransportResult<Vec<u8>>;
307
308    /// HEAD-check a pack. Cheaper than [`Self::download_pack`] on
309    /// network transports.
310    fn pack_exists(&self, key: &PackKey) -> TransportResult<bool>;
311
312    /// Unconditional ref write — equivalent to
313    /// `update_ref(name, RefWriteCondition::Any, hash)`.
314    ///
315    /// Default impl delegates to [`Self::update_ref`] so transports only
316    /// implement one entry point.
317    fn write_ref(&self, name: &str, hash: &Hash) -> TransportResult<()> {
318        self.update_ref(name, RefWriteCondition::Any, hash)
319    }
320
321    /// CAS ref write. See [`RefWriteCondition`].
322    ///
323    /// On `.missing` / `.match` CAS failure, returns
324    /// [`TransportError::RefConflict`]. Callers retrying after a
325    /// timeout MUST follow up with [`Self::read_ref`] to confirm
326    /// whether the first attempt actually landed (SPEC-TRANSPORT §7).
327    fn update_ref(
328        &self,
329        name: &str,
330        condition: RefWriteCondition,
331        hash: &Hash,
332    ) -> TransportResult<()>;
333
334    /// Read the current value of a ref, or `None` if it does not exist.
335    fn read_ref(&self, name: &str) -> TransportResult<Option<Hash>>;
336
337    /// List refs whose full name starts with `prefix`. Returned names
338    /// have `prefix` stripped per SPEC-REFS §4. An empty prefix lists
339    /// every ref.
340    fn list_refs(&self, prefix: &str) -> TransportResult<Vec<Ref>>;
341}
342
343// ---------------------------------------------------------------------------
344// async_shim — sync/async bridge for transports that wrap an async cipher
345// ---------------------------------------------------------------------------
346
347/// Sync-over-async shim for transports whose underlying cipher / I/O is
348/// async (e.g. `commonware-stream::encrypted`) but whose
349/// [`Transport`] trait surface is intentionally sync.
350///
351/// Lives in `mkit-core` (the trait crate) because it is generic
352/// infrastructure — multiple transports and Phase 2 sparse-checkout
353/// will reuse the same plug-in point. It does **not** depend on
354/// `tokio`, `commonware-runtime`, or any concrete executor; callers
355/// pick the runner.
356///
357/// # Why a trait
358///
359/// `mkit-transport-enc` and (in Phase 2) `mkit-core::sparse` need to
360/// drive `async fn` bodies from a sync method. Hard-coding
361/// `tokio::runtime::Handle::block_on` would bleed tokio across the
362/// workspace; hard-coding `commonware_runtime::deterministic` would
363/// mean production = tests. A pluggable `Executor` keeps the
364/// runtime-choice at the consumer crate.
365pub mod async_shim {
366    /// Drives an async future to completion synchronously. Pluggable so
367    /// callers can choose between `tokio`, `commonware-runtime`'s
368    /// deterministic runner (tests), or the planned production tokio
369    /// runner without `mkit-core` having to compile-time depend on a
370    /// specific runtime crate.
371    ///
372    /// Implementations MUST be re-entrancy-safe in the sense expected
373    /// by the chosen runtime — calling `block_on` from inside an
374    /// already-running task on the same runtime will typically panic
375    /// or deadlock. The shim's contract is "synchronous external API
376    /// wraps async internals", not "arbitrary async-from-sync
377    /// recursion".
378    pub trait Executor: Send + Sync {
379        /// Block the current thread until `fut` resolves.
380        fn block_on<F, T>(&self, fut: F) -> T
381        where
382            F: core::future::Future<Output = T> + Send,
383            T: Send;
384    }
385}
386
387// ---------------------------------------------------------------------------
388// Tests
389// ---------------------------------------------------------------------------
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    #[test]
396    fn pack_key_hex_roundtrip() {
397        let bytes = [0x42u8; 32];
398        let pk = PackKey::new(bytes);
399        let hex = pk.to_hex();
400        assert_eq!(hex.len(), 64);
401        let pk2 = pack_key_from_hex(&hex).unwrap();
402        assert_eq!(pk, pk2);
403    }
404
405    #[test]
406    fn is_retryable_matches_spec() {
407        assert!(is_retryable(&TransportError::ConnectionFailed));
408        assert!(is_retryable(&TransportError::ServerError { status: 500 }));
409        assert!(is_retryable(&TransportError::ServerError { status: 503 }));
410        assert!(is_retryable(&TransportError::ServerError { status: 429 }));
411        assert!(!is_retryable(&TransportError::ServerError { status: 404 }));
412        assert!(!is_retryable(&TransportError::ServerError { status: 401 }));
413        assert!(!is_retryable(&TransportError::PackNotFound));
414        assert!(!is_retryable(&TransportError::AccessDenied));
415        assert!(!is_retryable(&TransportError::RefConflict));
416    }
417
418    #[test]
419    fn backoff_default_ladder_is_1_2_4_8_16() {
420        let delays: Vec<Duration> = BackoffIterator::new().collect();
421        assert_eq!(
422            delays,
423            vec![
424                Duration::from_secs(1),
425                Duration::from_secs(2),
426                Duration::from_secs(4),
427                Duration::from_secs(8),
428                Duration::from_secs(16),
429            ]
430        );
431    }
432
433    #[test]
434    fn backoff_caps_at_max() {
435        let cap = Duration::from_secs(10);
436        let delays: Vec<Duration> = BackoffIterator::with(Duration::from_secs(8), cap, 5).collect();
437        // 8s, then cap (16s would exceed 10s cap; clamped to 10s)
438        assert_eq!(delays[0], Duration::from_secs(8));
439        for d in &delays[1..] {
440            assert!(*d <= cap);
441        }
442    }
443}