Skip to main content

axon/buffer/
mod.rs

1//! Zero-copy multimodal buffers — §λ-L-E Fase 11.b.
2//!
3//! Bytes that enter the runtime through a network socket, file
4//! handle or FFI boundary land **directly** in a region of Rust-
5//! owned memory. The Python layer manipulates `SymbolicPtr<T>`
6//! handles (Arc clones) — never the raw bytes — until the final
7//! consumer (transcoder, compressor, sink) asks for a slice.
8//!
9//! Three building blocks:
10//!
11//! - [`BufferKind`] — content-kind tag (`raw`, `pcm16`, `mulaw8`,
12//!   `jpeg`, …). Extensible (unlike the closed catalogues of 11.a);
13//!   adopters can register new kinds via [`BufferKind::register`].
14//! - [`ZeroCopyBuffer`] — the primitive. An `Arc<[u8]>` plus a
15//!   [`BufferKind`] tag. Clone is O(1) (Arc refcount bump); slicing
16//!   returns another `ZeroCopyBuffer` that shares the same backing
17//!   allocation.
18//! - [`BufferMut`] — mutable in-flight builder. Accumulates bytes
19//!   during ingest then `.freeze()`s into an immutable
20//!   `ZeroCopyBuffer` at end-of-stream.
21//!
22//! Pool-backed allocation lives in the sibling [`pool`] module.
23
24pub mod kind;
25pub mod pool;
26
27use std::sync::Arc;
28
29pub use self::kind::{BufferKind, BufferKindRegistry};
30pub use self::pool::{BufferPool, BufferPoolSnapshot, PoolClass};
31
32// ── ZeroCopyBuffer ────────────────────────────────────────────────────
33
34/// An immutable view over a region of bytes. Cloneable at O(1) — the
35/// backing storage is an `Arc<[u8]>` so clones are refcount bumps.
36///
37/// Slicing returns another `ZeroCopyBuffer` that references the same
38/// underlying allocation: no copies, no new allocations on the hot
39/// path. When the last `ZeroCopyBuffer` referencing an allocation
40/// drops, the storage returns to the [`BufferPool`] (if it came from
41/// one) or is freed (if direct-allocated).
42#[derive(Debug, Clone)]
43pub struct ZeroCopyBuffer {
44    /// Backing storage. `Arc` gives us cheap clones + automatic
45    /// release when all views drop.
46    storage: Arc<[u8]>,
47    /// Sub-range within `storage` that this view exposes. Invariant:
48    /// `start <= end <= storage.len()`.
49    start: usize,
50    end: usize,
51    /// Content-kind tag. `raw` for untagged bytes; adopters upgrade
52    /// to domain-specific kinds at the point where they know (e.g.
53    /// after parsing a multipart header, upgrade `raw` → `jpeg`).
54    kind: BufferKind,
55    /// Opaque tenant slug for pool bookkeeping. `None` when the
56    /// buffer was direct-allocated outside a tenant context.
57    tenant_id: Option<Arc<str>>,
58}
59
60impl ZeroCopyBuffer {
61    /// Construct from an already-owned byte slice. Copies once into
62    /// the Arc — use [`ZeroCopyBuffer::from_arc`] to avoid that copy
63    /// when the caller already has an `Arc<[u8]>`.
64    pub fn from_bytes(bytes: impl Into<Vec<u8>>, kind: BufferKind) -> Self {
65        let v = bytes.into();
66        let len = v.len();
67        let storage: Arc<[u8]> = v.into();
68        ZeroCopyBuffer {
69            storage,
70            start: 0,
71            end: len,
72            kind,
73            tenant_id: None,
74        }
75    }
76
77    /// Construct from an existing `Arc<[u8]>` without copying.
78    pub fn from_arc(storage: Arc<[u8]>, kind: BufferKind) -> Self {
79        let len = storage.len();
80        ZeroCopyBuffer {
81            storage,
82            start: 0,
83            end: len,
84            kind,
85            tenant_id: None,
86        }
87    }
88
89    /// Tag the buffer with its owning tenant (for pool accounting).
90    pub fn with_tenant(mut self, tenant_id: impl Into<Arc<str>>) -> Self {
91        self.tenant_id = Some(tenant_id.into());
92        self
93    }
94
95    /// The buffer's visible length in bytes.
96    pub fn len(&self) -> usize {
97        self.end - self.start
98    }
99
100    pub fn is_empty(&self) -> bool {
101        self.len() == 0
102    }
103
104    /// Content-kind tag.
105    pub fn kind(&self) -> BufferKind {
106        self.kind.clone()
107    }
108
109    pub fn tenant_id(&self) -> Option<&str> {
110        self.tenant_id.as_deref()
111    }
112
113    /// Upgrade the content-kind tag (e.g. `raw` → `jpeg` after
114    /// format detection). Returns a new `ZeroCopyBuffer` that shares
115    /// the backing storage; the original view is untouched so other
116    /// holders see the old kind unchanged.
117    pub fn retag(&self, kind: BufferKind) -> Self {
118        let mut cloned = self.clone();
119        cloned.kind = kind;
120        cloned
121    }
122
123    /// Borrow the visible byte range as a slice. Callers usually
124    /// reach for [`ZeroCopyBuffer::as_slice`] (alias) or feed this
125    /// straight to their consumer. **Do not** copy the slice on the
126    /// hot path — pass the `ZeroCopyBuffer` by reference instead.
127    pub fn as_slice(&self) -> &[u8] {
128        &self.storage[self.start..self.end]
129    }
130
131    /// Return a sub-view into this buffer. O(1) — no copy; the
132    /// returned buffer shares the same `Arc`.
133    pub fn slice(&self, range: std::ops::Range<usize>) -> Self {
134        let len = self.len();
135        assert!(
136            range.start <= range.end,
137            "slice start {} > end {}",
138            range.start,
139            range.end
140        );
141        assert!(
142            range.end <= len,
143            "slice end {} exceeds buffer len {}",
144            range.end,
145            len
146        );
147        ZeroCopyBuffer {
148            storage: Arc::clone(&self.storage),
149            start: self.start + range.start,
150            end: self.start + range.end,
151            kind: self.kind.clone(),
152            tenant_id: self.tenant_id.clone(),
153        }
154    }
155
156    /// Number of live views over the backing storage (Arc strong
157    /// count). Useful for observability; do NOT use for flow control.
158    pub fn sharers(&self) -> usize {
159        Arc::strong_count(&self.storage)
160    }
161
162    /// Compute a SHA-256 over the visible slice. Not cached — callers
163    /// that need repeated hashes should wrap in their own cache.
164    pub fn sha256(&self) -> [u8; 32] {
165        use sha2::{Digest, Sha256};
166        let mut h = Sha256::new();
167        h.update(self.as_slice());
168        let out = h.finalize();
169        let mut array = [0u8; 32];
170        array.copy_from_slice(&out);
171        array
172    }
173}
174
175// ── BufferMut (in-flight builder) ────────────────────────────────────
176
177/// Mutable append-only builder used while bytes are still arriving.
178/// When the ingest stream terminates, call [`BufferMut::freeze`] to
179/// convert into an immutable [`ZeroCopyBuffer`] with a single Arc
180/// construction (no copy).
181#[derive(Debug)]
182pub struct BufferMut {
183    storage: Vec<u8>,
184    kind: BufferKind,
185    tenant_id: Option<Arc<str>>,
186}
187
188impl BufferMut {
189    /// Build with initial reserved capacity.
190    pub fn with_capacity(capacity: usize, kind: BufferKind) -> Self {
191        BufferMut {
192            storage: Vec::with_capacity(capacity),
193            kind,
194            tenant_id: None,
195        }
196    }
197
198    pub fn with_tenant(mut self, tenant_id: impl Into<Arc<str>>) -> Self {
199        self.tenant_id = Some(tenant_id.into());
200        self
201    }
202
203    pub fn len(&self) -> usize {
204        self.storage.len()
205    }
206
207    pub fn is_empty(&self) -> bool {
208        self.storage.is_empty()
209    }
210
211    pub fn capacity(&self) -> usize {
212        self.storage.capacity()
213    }
214
215    /// Append bytes. Grows the internal `Vec` using the standard
216    /// doubling policy.
217    pub fn extend_from_slice(&mut self, bytes: &[u8]) {
218        self.storage.extend_from_slice(bytes);
219    }
220
221    /// Freeze into an immutable `ZeroCopyBuffer`. This is the moment
222    /// the storage transitions from `Vec<u8>` (unique mutable) to
223    /// `Arc<[u8]>` (shared immutable). The `Vec -> Arc<[u8]>`
224    /// conversion reuses the allocation when possible.
225    pub fn freeze(self) -> ZeroCopyBuffer {
226        let len = self.storage.len();
227        let storage: Arc<[u8]> = self.storage.into();
228        ZeroCopyBuffer {
229            storage,
230            start: 0,
231            end: len,
232            kind: self.kind,
233            tenant_id: self.tenant_id,
234        }
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn from_bytes_roundtrip() {
244        let b = ZeroCopyBuffer::from_bytes(vec![1, 2, 3], BufferKind::raw());
245        assert_eq!(b.len(), 3);
246        assert_eq!(b.as_slice(), &[1, 2, 3]);
247        assert_eq!(b.kind(), BufferKind::raw());
248    }
249
250    #[test]
251    fn clone_shares_storage() {
252        let b = ZeroCopyBuffer::from_bytes(vec![0u8; 1024], BufferKind::raw());
253        let c = b.clone();
254        assert_eq!(b.sharers(), 2);
255        assert_eq!(c.sharers(), 2);
256        drop(c);
257        assert_eq!(b.sharers(), 1);
258    }
259
260    #[test]
261    fn slice_shares_storage_and_preserves_kind() {
262        let b = ZeroCopyBuffer::from_bytes(
263            vec![10, 20, 30, 40],
264            BufferKind::pcm16(),
265        );
266        let s = b.slice(1..3);
267        assert_eq!(s.as_slice(), &[20, 30]);
268        assert_eq!(s.kind(), BufferKind::pcm16());
269        assert_eq!(b.sharers(), 2);
270    }
271
272    #[test]
273    fn slice_out_of_range_panics() {
274        let b = ZeroCopyBuffer::from_bytes(vec![1, 2, 3], BufferKind::raw());
275        let result = std::panic::catch_unwind(|| {
276            let _ = b.slice(0..10);
277        });
278        assert!(result.is_err());
279    }
280
281    #[test]
282    fn retag_leaves_original_kind() {
283        let b = ZeroCopyBuffer::from_bytes(vec![1, 2, 3], BufferKind::raw());
284        let j = b.retag(BufferKind::jpeg());
285        assert_eq!(b.kind(), BufferKind::raw());
286        assert_eq!(j.kind(), BufferKind::jpeg());
287        // Storage is still shared.
288        assert!(b.sharers() >= 2);
289    }
290
291    #[test]
292    fn buffer_mut_freeze_reuses_allocation() {
293        let mut bm = BufferMut::with_capacity(1024, BufferKind::raw());
294        bm.extend_from_slice(b"hello ");
295        bm.extend_from_slice(b"world");
296        let frozen = bm.freeze();
297        assert_eq!(frozen.as_slice(), b"hello world");
298        assert_eq!(frozen.len(), 11);
299    }
300
301    #[test]
302    fn sha256_computes_on_visible_slice_only() {
303        let b = ZeroCopyBuffer::from_bytes(
304            vec![b'a', b'b', b'c', b'd'],
305            BufferKind::raw(),
306        );
307        let s = b.slice(1..3); // "bc"
308        let reference = ZeroCopyBuffer::from_bytes(
309            b"bc".to_vec(),
310            BufferKind::raw(),
311        );
312        assert_eq!(s.sha256(), reference.sha256());
313    }
314
315    #[test]
316    fn tenant_tag_propagates_through_clone_and_slice() {
317        let b = ZeroCopyBuffer::from_bytes(
318            vec![0u8; 16],
319            BufferKind::raw(),
320        )
321        .with_tenant("alpha");
322        let c = b.clone();
323        assert_eq!(c.tenant_id(), Some("alpha"));
324        let s = b.slice(0..4);
325        assert_eq!(s.tenant_id(), Some("alpha"));
326    }
327}