Skip to main content

peat_protocol/storage/
blob_document_integration.rs

1//! Document-Blob Integration (ADR-025 Phase 2)
2//!
3//! This module connects blob storage with CRDT document sync, enabling:
4//! - Store blob tokens in documents for mesh synchronization
5//! - Retrieve blob tokens from synced documents
6//! - Automatic blob fetching when documents sync
7//!
8//! # How It Works
9//!
10//! 1. **Create Blob**: Use `BlobStore::create_blob()` to add content
11//! 2. **Store Reference**: Use `store_blob_reference()` to put token in document
12//! 3. **Sync**: Document syncs to peers via CRDT (Automerge)
13//! 4. **Fetch**: Peer retrieves token, uses `fetch_blob()` to download content
14//!
15//! # Example
16//!
17//! ```ignore
18//! use peat_protocol::storage::{
19//!     BlobDocumentIntegration, BlobStore, BlobMetadata,
20//! };
21//!
22//! // Create blob (backend-specific, e.g. NetworkedIrohBlobStore)
23//! let token = blob_store.create_blob(
24//!     Path::new("/models/target_recognition.onnx"),
25//!     BlobMetadata::with_name("target_recognition.onnx"),
26//! ).await?;
27//!
28//! // Store in model registry document via the BlobDocumentIntegration trait
29//! integration.store_blob_reference(
30//!     "model_registry",
31//!     "target_recognition:4.2.1",
32//!     "model_blob",
33//!     &token,
34//! ).await?;
35//!
36//! // On another node after sync...
37//! if let Some(token) = integration.get_blob_reference(
38//!     "model_registry",
39//!     "target_recognition:4.2.1",
40//!     "model_blob",
41//! ).await? {
42//!     let handle = blob_store.fetch_blob(&token, |p| println!("{:?}", p)).await?;
43//!     println!("Model available at: {}", handle.path().display());
44//! }
45//! ```
46
47use super::blob_traits::{BlobHash, BlobMetadata, BlobProgress, BlobToken};
48use anyhow::Result;
49use serde::{Deserialize, Serialize};
50use std::collections::HashMap;
51
52/// Serializable blob reference for storage in documents
53///
54/// This struct is stored as a JSON field in CRDT documents.
55/// When the document syncs, the blob reference syncs too.
56#[derive(Clone, Debug, Serialize, Deserialize)]
57pub struct BlobReference {
58    /// Content hash (content-addressed ID)
59    pub hash: String,
60    /// Size in bytes
61    pub size_bytes: u64,
62    /// Blob metadata
63    pub metadata: BlobReferenceMetadata,
64}
65
66/// Metadata within a blob reference
67#[derive(Clone, Debug, Serialize, Deserialize, Default)]
68pub struct BlobReferenceMetadata {
69    /// Human-readable name
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub name: Option<String>,
72    /// MIME type
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub content_type: Option<String>,
75    /// Custom key-value pairs
76    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
77    pub custom: HashMap<String, String>,
78}
79
80impl From<&BlobToken> for BlobReference {
81    fn from(token: &BlobToken) -> Self {
82        Self {
83            hash: token.hash.as_hex().to_string(),
84            size_bytes: token.size_bytes,
85            metadata: BlobReferenceMetadata {
86                name: token.metadata.name.clone(),
87                content_type: token.metadata.content_type.clone(),
88                custom: token.metadata.custom.clone(),
89            },
90        }
91    }
92}
93
94impl From<BlobReference> for BlobToken {
95    fn from(reference: BlobReference) -> Self {
96        Self {
97            hash: BlobHash::from_hex(&reference.hash),
98            size_bytes: reference.size_bytes,
99            metadata: BlobMetadata {
100                name: reference.metadata.name,
101                content_type: reference.metadata.content_type,
102                custom: reference.metadata.custom,
103            },
104        }
105    }
106}
107
108/// Trait for integrating blob storage with CRDT documents
109///
110/// This trait enables storing blob tokens in documents that sync
111/// via CRDT, allowing blobs to be discovered through document queries.
112#[async_trait::async_trait]
113pub trait BlobDocumentIntegration: Send + Sync {
114    /// Store a blob token reference in a document field
115    ///
116    /// The token is serialized and stored in the specified field.
117    /// When the document syncs to other nodes, they can retrieve
118    /// the token and fetch the blob content.
119    ///
120    /// # Arguments
121    /// * `collection` - Collection name
122    /// * `doc_id` - Document ID
123    /// * `field` - Field name to store the token in
124    /// * `token` - Blob token to store
125    ///
126    /// # Note
127    /// If the document doesn't exist, it will be created.
128    /// If the field already exists, it will be overwritten.
129    async fn store_blob_reference(
130        &self,
131        collection: &str,
132        doc_id: &str,
133        field: &str,
134        token: &BlobToken,
135    ) -> Result<()>;
136
137    /// Retrieve a blob token from a document field
138    ///
139    /// Reads the specified field from the document and deserializes
140    /// it as a blob token.
141    ///
142    /// # Arguments
143    /// * `collection` - Collection name
144    /// * `doc_id` - Document ID
145    /// * `field` - Field name containing the token
146    ///
147    /// # Returns
148    /// `Some(token)` if the field exists and is valid, `None` otherwise
149    async fn get_blob_reference(
150        &self,
151        collection: &str,
152        doc_id: &str,
153        field: &str,
154    ) -> Result<Option<BlobToken>>;
155
156    /// Remove a blob reference from a document
157    ///
158    /// Sets the field to null, indicating no blob is referenced.
159    /// The blob itself is NOT deleted - just the reference.
160    async fn remove_blob_reference(
161        &self,
162        collection: &str,
163        doc_id: &str,
164        field: &str,
165    ) -> Result<()>;
166
167    /// List all blob references in a document
168    ///
169    /// Scans the document for fields containing blob references.
170    /// Useful for discovering all blobs associated with a document.
171    ///
172    /// # Returns
173    /// Map of field_name -> BlobToken for all blob reference fields
174    async fn list_blob_references(
175        &self,
176        collection: &str,
177        doc_id: &str,
178    ) -> Result<HashMap<String, BlobToken>>;
179
180    /// Store blob reference and fetch it locally
181    ///
182    /// Convenience method that stores the reference and immediately
183    /// fetches the blob to ensure it's available locally.
184    async fn store_and_fetch<F>(
185        &self,
186        collection: &str,
187        doc_id: &str,
188        field: &str,
189        token: &BlobToken,
190        progress: F,
191    ) -> Result<std::path::PathBuf>
192    where
193        F: FnMut(BlobProgress) + Send + 'static;
194}
195
196// ============================================================================
197// Model Registry Helper Types (for ADR-022 integration)
198// ============================================================================
199
200/// Model variant blob reference with execution requirements
201#[derive(Clone, Debug, Serialize, Deserialize)]
202pub struct ModelVariantBlob {
203    /// Blob reference for this variant
204    pub blob: BlobReference,
205    /// Precision (e.g., "float32", "float16", "int8")
206    pub precision: String,
207    /// Supported execution providers (e.g., ["CUDAExecutionProvider", "CPUExecutionProvider"])
208    pub execution_providers: Vec<String>,
209    /// Minimum GPU memory required in GB (if applicable)
210    #[serde(skip_serializing_if = "Option::is_none")]
211    pub min_gpu_memory_gb: Option<f64>,
212}
213
214/// Model registry document with blob references
215///
216/// This schema matches ADR-022 model registry format with blob tokens.
217#[derive(Clone, Debug, Serialize, Deserialize)]
218pub struct ModelRegistryDocument {
219    /// Model identifier (e.g., "target_recognition")
220    pub model_id: String,
221    /// Semantic version (e.g., "4.2.1")
222    pub version: String,
223    /// Available model variants keyed by variant ID
224    pub variants: HashMap<String, ModelVariantBlob>,
225    /// Model provenance information
226    #[serde(skip_serializing_if = "Option::is_none")]
227    pub provenance: Option<ModelProvenance>,
228    /// Human-readable description
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub description: Option<String>,
231}
232
233/// Model provenance and signing information
234#[derive(Clone, Debug, Serialize, Deserialize)]
235pub struct ModelProvenance {
236    /// Entity that signed the model
237    pub signed_by: String,
238    /// Cryptographic signature
239    pub signature: String,
240    /// Timestamp of signing
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub signed_at: Option<String>,
243}
244
245impl ModelRegistryDocument {
246    /// Create a new model registry document
247    pub fn new(model_id: &str, version: &str) -> Self {
248        Self {
249            model_id: model_id.to_string(),
250            version: version.to_string(),
251            variants: HashMap::new(),
252            provenance: None,
253            description: None,
254        }
255    }
256
257    /// Add a model variant with blob reference
258    pub fn add_variant(
259        &mut self,
260        variant_id: &str,
261        token: &BlobToken,
262        precision: &str,
263        execution_providers: Vec<String>,
264        min_gpu_memory_gb: Option<f64>,
265    ) {
266        self.variants.insert(
267            variant_id.to_string(),
268            ModelVariantBlob {
269                blob: BlobReference::from(token),
270                precision: precision.to_string(),
271                execution_providers,
272                min_gpu_memory_gb,
273            },
274        );
275    }
276
277    /// Get document ID (model_id:version)
278    pub fn doc_id(&self) -> String {
279        format!("{}:{}", self.model_id, self.version)
280    }
281}
282
283// ============================================================================
284// Tests
285// ============================================================================
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_blob_reference_serialization() {
293        let token = BlobToken {
294            hash: BlobHash::from_hex("abc123def456"),
295            size_bytes: 1024 * 1024,
296            metadata: BlobMetadata {
297                name: Some("test.onnx".to_string()),
298                content_type: Some("application/onnx".to_string()),
299                custom: HashMap::new(),
300            },
301        };
302
303        let reference = BlobReference::from(&token);
304        let json = serde_json::to_string_pretty(&reference).unwrap();
305
306        // Verify it serializes correctly
307        assert!(json.contains("abc123def456"));
308        assert!(json.contains("1048576"));
309        assert!(json.contains("test.onnx"));
310
311        // Verify round-trip
312        let deserialized: BlobReference = serde_json::from_str(&json).unwrap();
313        let token_back = BlobToken::from(deserialized);
314
315        assert_eq!(token_back.hash.as_hex(), token.hash.as_hex());
316        assert_eq!(token_back.size_bytes, token.size_bytes);
317        assert_eq!(token_back.metadata.name, token.metadata.name);
318    }
319
320    #[test]
321    fn test_model_registry_document() {
322        let token = BlobToken {
323            hash: BlobHash::from_hex("sha256:abc123"),
324            size_bytes: 500_000_000,
325            metadata: BlobMetadata::with_name("target_recognition_fp32.onnx"),
326        };
327
328        let mut doc = ModelRegistryDocument::new("target_recognition", "4.2.1");
329        doc.add_variant(
330            "fp32_cuda",
331            &token,
332            "float32",
333            vec!["CUDAExecutionProvider".to_string()],
334            Some(4.0),
335        );
336
337        assert_eq!(doc.doc_id(), "target_recognition:4.2.1");
338        assert!(doc.variants.contains_key("fp32_cuda"));
339
340        let json = serde_json::to_string_pretty(&doc).unwrap();
341        assert!(json.contains("target_recognition"));
342        assert!(json.contains("CUDAExecutionProvider"));
343    }
344
345    #[test]
346    fn test_blob_reference_with_custom_metadata() {
347        let mut custom = HashMap::new();
348        custom.insert("training_date".to_string(), "2025-01-15".to_string());
349        custom.insert("accuracy".to_string(), "0.95".to_string());
350
351        let token = BlobToken {
352            hash: BlobHash::from_hex("deadbeef"),
353            size_bytes: 100,
354            metadata: BlobMetadata {
355                name: Some("model.onnx".to_string()),
356                content_type: None,
357                custom,
358            },
359        };
360
361        let reference = BlobReference::from(&token);
362        let json = serde_json::to_string(&reference).unwrap();
363
364        // Custom fields should be present
365        assert!(json.contains("training_date"));
366        assert!(json.contains("accuracy"));
367
368        // Round-trip should preserve custom fields
369        let deserialized: BlobReference = serde_json::from_str(&json).unwrap();
370        assert_eq!(
371            deserialized.metadata.custom.get("training_date"),
372            Some(&"2025-01-15".to_string())
373        );
374    }
375}