peat_protocol/storage/blob_document_integration.rs
1//! Document-Blob Integration (ADR-025 Phase 2)
2//!
3//! This module connects blob storage with CRDT document sync, enabling:
4//! - Store blob tokens in documents for mesh synchronization
5//! - Retrieve blob tokens from synced documents
6//! - Automatic blob fetching when documents sync
7//!
8//! # How It Works
9//!
10//! 1. **Create Blob**: Use `BlobStore::create_blob()` to add content
11//! 2. **Store Reference**: Use `store_blob_reference()` to put token in document
12//! 3. **Sync**: Document syncs to peers via CRDT (Automerge)
13//! 4. **Fetch**: Peer retrieves token, uses `fetch_blob()` to download content
14//!
15//! # Example
16//!
17//! ```ignore
18//! use peat_protocol::storage::{
19//! BlobDocumentIntegration, BlobStore, BlobMetadata,
20//! };
21//!
22//! // Create blob (backend-specific, e.g. NetworkedIrohBlobStore)
23//! let token = blob_store.create_blob(
24//! Path::new("/models/target_recognition.onnx"),
25//! BlobMetadata::with_name("target_recognition.onnx"),
26//! ).await?;
27//!
28//! // Store in model registry document via the BlobDocumentIntegration trait
29//! integration.store_blob_reference(
30//! "model_registry",
31//! "target_recognition:4.2.1",
32//! "model_blob",
33//! &token,
34//! ).await?;
35//!
36//! // On another node after sync...
37//! if let Some(token) = integration.get_blob_reference(
38//! "model_registry",
39//! "target_recognition:4.2.1",
40//! "model_blob",
41//! ).await? {
42//! let handle = blob_store.fetch_blob(&token, |p| println!("{:?}", p)).await?;
43//! println!("Model available at: {}", handle.path().display());
44//! }
45//! ```
46
47use super::blob_traits::{BlobHash, BlobMetadata, BlobProgress, BlobToken};
48use anyhow::Result;
49use serde::{Deserialize, Serialize};
50use std::collections::HashMap;
51
52/// Serializable blob reference for storage in documents
53///
54/// This struct is stored as a JSON field in CRDT documents.
55/// When the document syncs, the blob reference syncs too.
56#[derive(Clone, Debug, Serialize, Deserialize)]
57pub struct BlobReference {
58 /// Content hash (content-addressed ID)
59 pub hash: String,
60 /// Size in bytes
61 pub size_bytes: u64,
62 /// Blob metadata
63 pub metadata: BlobReferenceMetadata,
64}
65
66/// Metadata within a blob reference
67#[derive(Clone, Debug, Serialize, Deserialize, Default)]
68pub struct BlobReferenceMetadata {
69 /// Human-readable name
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub name: Option<String>,
72 /// MIME type
73 #[serde(skip_serializing_if = "Option::is_none")]
74 pub content_type: Option<String>,
75 /// Custom key-value pairs
76 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
77 pub custom: HashMap<String, String>,
78}
79
80impl From<&BlobToken> for BlobReference {
81 fn from(token: &BlobToken) -> Self {
82 Self {
83 hash: token.hash.as_hex().to_string(),
84 size_bytes: token.size_bytes,
85 metadata: BlobReferenceMetadata {
86 name: token.metadata.name.clone(),
87 content_type: token.metadata.content_type.clone(),
88 custom: token.metadata.custom.clone(),
89 },
90 }
91 }
92}
93
94impl From<BlobReference> for BlobToken {
95 fn from(reference: BlobReference) -> Self {
96 Self {
97 hash: BlobHash::from_hex(&reference.hash),
98 size_bytes: reference.size_bytes,
99 metadata: BlobMetadata {
100 name: reference.metadata.name,
101 content_type: reference.metadata.content_type,
102 custom: reference.metadata.custom,
103 },
104 }
105 }
106}
107
108/// Trait for integrating blob storage with CRDT documents
109///
110/// This trait enables storing blob tokens in documents that sync
111/// via CRDT, allowing blobs to be discovered through document queries.
112#[async_trait::async_trait]
113pub trait BlobDocumentIntegration: Send + Sync {
114 /// Store a blob token reference in a document field
115 ///
116 /// The token is serialized and stored in the specified field.
117 /// When the document syncs to other nodes, they can retrieve
118 /// the token and fetch the blob content.
119 ///
120 /// # Arguments
121 /// * `collection` - Collection name
122 /// * `doc_id` - Document ID
123 /// * `field` - Field name to store the token in
124 /// * `token` - Blob token to store
125 ///
126 /// # Note
127 /// If the document doesn't exist, it will be created.
128 /// If the field already exists, it will be overwritten.
129 async fn store_blob_reference(
130 &self,
131 collection: &str,
132 doc_id: &str,
133 field: &str,
134 token: &BlobToken,
135 ) -> Result<()>;
136
137 /// Retrieve a blob token from a document field
138 ///
139 /// Reads the specified field from the document and deserializes
140 /// it as a blob token.
141 ///
142 /// # Arguments
143 /// * `collection` - Collection name
144 /// * `doc_id` - Document ID
145 /// * `field` - Field name containing the token
146 ///
147 /// # Returns
148 /// `Some(token)` if the field exists and is valid, `None` otherwise
149 async fn get_blob_reference(
150 &self,
151 collection: &str,
152 doc_id: &str,
153 field: &str,
154 ) -> Result<Option<BlobToken>>;
155
156 /// Remove a blob reference from a document
157 ///
158 /// Sets the field to null, indicating no blob is referenced.
159 /// The blob itself is NOT deleted - just the reference.
160 async fn remove_blob_reference(
161 &self,
162 collection: &str,
163 doc_id: &str,
164 field: &str,
165 ) -> Result<()>;
166
167 /// List all blob references in a document
168 ///
169 /// Scans the document for fields containing blob references.
170 /// Useful for discovering all blobs associated with a document.
171 ///
172 /// # Returns
173 /// Map of field_name -> BlobToken for all blob reference fields
174 async fn list_blob_references(
175 &self,
176 collection: &str,
177 doc_id: &str,
178 ) -> Result<HashMap<String, BlobToken>>;
179
180 /// Store blob reference and fetch it locally
181 ///
182 /// Convenience method that stores the reference and immediately
183 /// fetches the blob to ensure it's available locally.
184 async fn store_and_fetch<F>(
185 &self,
186 collection: &str,
187 doc_id: &str,
188 field: &str,
189 token: &BlobToken,
190 progress: F,
191 ) -> Result<std::path::PathBuf>
192 where
193 F: FnMut(BlobProgress) + Send + 'static;
194}
195
196// ============================================================================
197// Model Registry Helper Types (for ADR-022 integration)
198// ============================================================================
199
200/// Model variant blob reference with execution requirements
201#[derive(Clone, Debug, Serialize, Deserialize)]
202pub struct ModelVariantBlob {
203 /// Blob reference for this variant
204 pub blob: BlobReference,
205 /// Precision (e.g., "float32", "float16", "int8")
206 pub precision: String,
207 /// Supported execution providers (e.g., ["CUDAExecutionProvider", "CPUExecutionProvider"])
208 pub execution_providers: Vec<String>,
209 /// Minimum GPU memory required in GB (if applicable)
210 #[serde(skip_serializing_if = "Option::is_none")]
211 pub min_gpu_memory_gb: Option<f64>,
212}
213
214/// Model registry document with blob references
215///
216/// This schema matches ADR-022 model registry format with blob tokens.
217#[derive(Clone, Debug, Serialize, Deserialize)]
218pub struct ModelRegistryDocument {
219 /// Model identifier (e.g., "target_recognition")
220 pub model_id: String,
221 /// Semantic version (e.g., "4.2.1")
222 pub version: String,
223 /// Available model variants keyed by variant ID
224 pub variants: HashMap<String, ModelVariantBlob>,
225 /// Model provenance information
226 #[serde(skip_serializing_if = "Option::is_none")]
227 pub provenance: Option<ModelProvenance>,
228 /// Human-readable description
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub description: Option<String>,
231}
232
233/// Model provenance and signing information
234#[derive(Clone, Debug, Serialize, Deserialize)]
235pub struct ModelProvenance {
236 /// Entity that signed the model
237 pub signed_by: String,
238 /// Cryptographic signature
239 pub signature: String,
240 /// Timestamp of signing
241 #[serde(skip_serializing_if = "Option::is_none")]
242 pub signed_at: Option<String>,
243}
244
245impl ModelRegistryDocument {
246 /// Create a new model registry document
247 pub fn new(model_id: &str, version: &str) -> Self {
248 Self {
249 model_id: model_id.to_string(),
250 version: version.to_string(),
251 variants: HashMap::new(),
252 provenance: None,
253 description: None,
254 }
255 }
256
257 /// Add a model variant with blob reference
258 pub fn add_variant(
259 &mut self,
260 variant_id: &str,
261 token: &BlobToken,
262 precision: &str,
263 execution_providers: Vec<String>,
264 min_gpu_memory_gb: Option<f64>,
265 ) {
266 self.variants.insert(
267 variant_id.to_string(),
268 ModelVariantBlob {
269 blob: BlobReference::from(token),
270 precision: precision.to_string(),
271 execution_providers,
272 min_gpu_memory_gb,
273 },
274 );
275 }
276
277 /// Get document ID (model_id:version)
278 pub fn doc_id(&self) -> String {
279 format!("{}:{}", self.model_id, self.version)
280 }
281}
282
283// ============================================================================
284// Tests
285// ============================================================================
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_blob_reference_serialization() {
293 let token = BlobToken {
294 hash: BlobHash::from_hex("abc123def456"),
295 size_bytes: 1024 * 1024,
296 metadata: BlobMetadata {
297 name: Some("test.onnx".to_string()),
298 content_type: Some("application/onnx".to_string()),
299 custom: HashMap::new(),
300 },
301 };
302
303 let reference = BlobReference::from(&token);
304 let json = serde_json::to_string_pretty(&reference).unwrap();
305
306 // Verify it serializes correctly
307 assert!(json.contains("abc123def456"));
308 assert!(json.contains("1048576"));
309 assert!(json.contains("test.onnx"));
310
311 // Verify round-trip
312 let deserialized: BlobReference = serde_json::from_str(&json).unwrap();
313 let token_back = BlobToken::from(deserialized);
314
315 assert_eq!(token_back.hash.as_hex(), token.hash.as_hex());
316 assert_eq!(token_back.size_bytes, token.size_bytes);
317 assert_eq!(token_back.metadata.name, token.metadata.name);
318 }
319
320 #[test]
321 fn test_model_registry_document() {
322 let token = BlobToken {
323 hash: BlobHash::from_hex("sha256:abc123"),
324 size_bytes: 500_000_000,
325 metadata: BlobMetadata::with_name("target_recognition_fp32.onnx"),
326 };
327
328 let mut doc = ModelRegistryDocument::new("target_recognition", "4.2.1");
329 doc.add_variant(
330 "fp32_cuda",
331 &token,
332 "float32",
333 vec!["CUDAExecutionProvider".to_string()],
334 Some(4.0),
335 );
336
337 assert_eq!(doc.doc_id(), "target_recognition:4.2.1");
338 assert!(doc.variants.contains_key("fp32_cuda"));
339
340 let json = serde_json::to_string_pretty(&doc).unwrap();
341 assert!(json.contains("target_recognition"));
342 assert!(json.contains("CUDAExecutionProvider"));
343 }
344
345 #[test]
346 fn test_blob_reference_with_custom_metadata() {
347 let mut custom = HashMap::new();
348 custom.insert("training_date".to_string(), "2025-01-15".to_string());
349 custom.insert("accuracy".to_string(), "0.95".to_string());
350
351 let token = BlobToken {
352 hash: BlobHash::from_hex("deadbeef"),
353 size_bytes: 100,
354 metadata: BlobMetadata {
355 name: Some("model.onnx".to_string()),
356 content_type: None,
357 custom,
358 },
359 };
360
361 let reference = BlobReference::from(&token);
362 let json = serde_json::to_string(&reference).unwrap();
363
364 // Custom fields should be present
365 assert!(json.contains("training_date"));
366 assert!(json.contains("accuracy"));
367
368 // Round-trip should preserve custom fields
369 let deserialized: BlobReference = serde_json::from_str(&json).unwrap();
370 assert_eq!(
371 deserialized.metadata.custom.get("training_date"),
372 Some(&"2025-01-15".to_string())
373 );
374 }
375}