Skip to main content

tensor_blob/
streaming.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use tensor_store::{ScalarValue, TensorData, TensorStore, TensorValue};
5
6use crate::{
7    chunker::{Chunk, Chunker, StreamingHasher},
8    error::{BlobError, Result},
9    gc::increment_chunk_refs,
10    metadata::PutOptions,
11};
12
13/// Internal state shared between `BlobWriter` and the store.
14pub struct WriteState {
15    pub artifact_id: String,
16    pub filename: String,
17    pub content_type: String,
18    pub created_by: String,
19    pub linked_to: Vec<String>,
20    pub tags: Vec<String>,
21    pub custom_metadata: std::collections::HashMap<String, String>,
22    pub embedding: Option<(Vec<f32>, String)>,
23}
24
25/// Streaming writer for uploading artifacts.
26pub struct BlobWriter {
27    store: TensorStore,
28    chunker: Chunker,
29    state: WriteState,
30    chunks: Vec<String>,
31    total_size: usize,
32    hasher: StreamingHasher,
33    buffer: Vec<u8>,
34}
35
36impl BlobWriter {
37    pub(crate) fn new(
38        store: TensorStore,
39        chunk_size: usize,
40        artifact_id: String,
41        filename: String,
42        options: PutOptions,
43        default_content_type: &str,
44    ) -> Self {
45        Self {
46            store,
47            chunker: Chunker::new(chunk_size),
48            state: WriteState {
49                artifact_id,
50                filename,
51                content_type: options
52                    .content_type
53                    .unwrap_or_else(|| default_content_type.to_string()),
54                created_by: options.created_by.unwrap_or_default(),
55                linked_to: options.linked_to,
56                tags: options.tags,
57                custom_metadata: options.metadata,
58                embedding: options.embedding,
59            },
60            chunks: Vec::new(),
61            total_size: 0,
62            hasher: StreamingHasher::new(),
63            buffer: Vec::new(),
64        }
65    }
66
67    /// Write data to the artifact. Data is chunked and stored incrementally.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if chunk storage fails.
72    #[allow(clippy::unused_async)]
73    pub async fn write(&mut self, data: &[u8]) -> Result<()> {
74        if data.is_empty() {
75            return Ok(());
76        }
77
78        // Update the full-file hash
79        self.hasher.update(data);
80        self.total_size += data.len();
81
82        // Add to buffer
83        self.buffer.extend_from_slice(data);
84
85        // Process complete chunks
86        while self.buffer.len() >= self.chunker.chunk_size() {
87            let chunk_data: Vec<u8> = self.buffer.drain(..self.chunker.chunk_size()).collect();
88            let chunk = Chunk::new(chunk_data);
89            self.store_chunk(chunk)?;
90        }
91
92        Ok(())
93    }
94
95    /// Store a chunk, handling deduplication.
96    fn store_chunk(&mut self, chunk: Chunk) -> Result<()> {
97        let chunk_key = chunk.key();
98
99        // Check if chunk already exists (deduplication)
100        if self.store.exists(&chunk_key) {
101            // Increment reference count
102            increment_chunk_refs(&self.store, &chunk_key)?;
103        } else {
104            // Store new chunk
105            let mut tensor = TensorData::new();
106            tensor.set(
107                "_type",
108                TensorValue::Scalar(ScalarValue::String("blob_chunk".to_string())),
109            );
110            tensor.set("_data", TensorValue::Scalar(ScalarValue::Bytes(chunk.data)));
111            tensor.set(
112                "_size",
113                TensorValue::Scalar(ScalarValue::Int(i64::try_from(chunk.size).unwrap_or(0))),
114            );
115            tensor.set("_refs", TensorValue::Scalar(ScalarValue::Int(1)));
116            tensor.set(
117                "_created",
118                TensorValue::Scalar(ScalarValue::Int(
119                    i64::try_from(current_timestamp()).unwrap_or(0),
120                )),
121            );
122
123            self.store.put(&chunk_key, tensor)?;
124        }
125
126        self.chunks.push(chunk_key);
127        Ok(())
128    }
129
130    /// Finalize the artifact and return its ID.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if metadata storage fails.
135    #[allow(clippy::unused_async)]
136    pub async fn finish(mut self) -> Result<String> {
137        // Flush remaining buffer
138        if !self.buffer.is_empty() {
139            let chunk = Chunk::new(std::mem::take(&mut self.buffer));
140            self.store_chunk(chunk)?;
141        }
142
143        let content_type_for_idx = self.state.content_type.clone();
144        let linked_to_for_idx = self.state.linked_to.clone();
145        let tags_for_idx = self.state.tags.clone();
146
147        let checksum = self.hasher.finalize();
148        let tensor = build_metadata_tensor(
149            &mut self.state,
150            &mut self.chunks,
151            self.total_size,
152            self.chunker.chunk_size(),
153            &checksum,
154        );
155
156        let meta_key = format!("_blob:meta:{}", self.state.artifact_id);
157        self.store.put(&meta_key, tensor)?;
158
159        Self::write_secondary_indexes(
160            &self.store,
161            &self.state.artifact_id,
162            &content_type_for_idx,
163            &linked_to_for_idx,
164            &tags_for_idx,
165        )?;
166
167        Ok(self.state.artifact_id)
168    }
169
170    /// Write secondary index entries for content type, links, and tags.
171    fn write_secondary_indexes(
172        store: &TensorStore,
173        artifact_id: &str,
174        content_type: &str,
175        linked_to: &[String],
176        tags: &[String],
177    ) -> Result<()> {
178        if !content_type.is_empty() {
179            let idx_key = format!("_blob:idx:ct:{content_type}:{artifact_id}");
180            store.put(&idx_key, TensorData::new())?;
181        }
182
183        for entity in linked_to {
184            let idx_key = format!("_blob:idx:link:{entity}:{artifact_id}");
185            store.put(&idx_key, TensorData::new())?;
186        }
187
188        for tag in tags {
189            let idx_key = format!("_blob:idx:tag:{tag}:{artifact_id}");
190            store.put(&idx_key, TensorData::new())?;
191        }
192
193        Ok(())
194    }
195
196    /// Get the current total size written.
197    #[must_use]
198    pub const fn bytes_written(&self) -> usize {
199        self.total_size
200    }
201
202    /// Get the number of chunks written so far.
203    #[must_use]
204    #[allow(clippy::missing_const_for_fn)]
205    pub fn chunks_written(&self) -> usize {
206        self.chunks.len()
207    }
208}
209
210/// Build the metadata tensor from the writer's accumulated state.
211///
212/// This is a free function rather than a method because `finish()` partially
213/// moves `self.hasher` before calling this, which prevents borrowing `self`.
214fn build_metadata_tensor(
215    state: &mut WriteState,
216    chunks: &mut Vec<String>,
217    total_size: usize,
218    chunk_size: usize,
219    checksum: &str,
220) -> TensorData {
221    let now = current_timestamp();
222    let mut tensor = TensorData::new();
223
224    tensor.set(
225        "_type",
226        TensorValue::Scalar(ScalarValue::String("blob_artifact".to_string())),
227    );
228    tensor.set(
229        "_id",
230        TensorValue::Scalar(ScalarValue::String(state.artifact_id.clone())),
231    );
232    tensor.set(
233        "_filename",
234        TensorValue::Scalar(ScalarValue::String(std::mem::take(&mut state.filename))),
235    );
236    tensor.set(
237        "_content_type",
238        TensorValue::Scalar(ScalarValue::String(std::mem::take(&mut state.content_type))),
239    );
240    tensor.set(
241        "_size",
242        TensorValue::Scalar(ScalarValue::Int(i64::try_from(total_size).unwrap_or(0))),
243    );
244    tensor.set(
245        "_checksum",
246        TensorValue::Scalar(ScalarValue::String(checksum.to_string())),
247    );
248    tensor.set(
249        "_chunk_size",
250        TensorValue::Scalar(ScalarValue::Int(i64::try_from(chunk_size).unwrap_or(0))),
251    );
252    tensor.set(
253        "_chunk_count",
254        TensorValue::Scalar(ScalarValue::Int(i64::try_from(chunks.len()).unwrap_or(0))),
255    );
256    tensor.set("_chunks", TensorValue::Pointers(std::mem::take(chunks)));
257    tensor.set(
258        "_created",
259        TensorValue::Scalar(ScalarValue::Int(i64::try_from(now).unwrap_or(0))),
260    );
261    tensor.set(
262        "_modified",
263        TensorValue::Scalar(ScalarValue::Int(i64::try_from(now).unwrap_or(0))),
264    );
265    tensor.set(
266        "_created_by",
267        TensorValue::Scalar(ScalarValue::String(std::mem::take(&mut state.created_by))),
268    );
269
270    let linked_to = std::mem::take(&mut state.linked_to);
271    if !linked_to.is_empty() {
272        tensor.set("_linked_to", TensorValue::Pointers(linked_to));
273    }
274
275    let tags = std::mem::take(&mut state.tags);
276    if !tags.is_empty() {
277        tensor.set(
278            "_tags",
279            TensorValue::Pointers(tags.into_iter().map(|t| format!("tag:{t}")).collect()),
280        );
281    }
282
283    for (key, value) in std::mem::take(&mut state.custom_metadata) {
284        tensor.set(
285            format!("_meta:{key}"),
286            TensorValue::Scalar(ScalarValue::String(value)),
287        );
288    }
289
290    if let Some((embedding, model)) = state.embedding.take() {
291        use tensor_store::SparseVector;
292        let storage = if should_use_sparse(&embedding) {
293            TensorValue::Sparse(SparseVector::from_dense(&embedding))
294        } else {
295            TensorValue::Vector(embedding)
296        };
297        tensor.set("_embedding", storage);
298        tensor.set(
299            "_embedded_model",
300            TensorValue::Scalar(ScalarValue::String(model)),
301        );
302    }
303
304    tensor
305}
306
307/// Streaming reader for downloading artifacts.
308pub struct BlobReader {
309    store: TensorStore,
310    chunks: Vec<String>,
311    current_chunk: usize,
312    current_data: Option<Vec<u8>>,
313    current_offset: usize,
314    total_size: usize,
315    bytes_read: usize,
316    checksum: String,
317}
318
319impl BlobReader {
320    /// Creates a new reader for the specified artifact.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if the artifact is not found.
325    pub(crate) fn new(store: TensorStore, artifact_id: &str) -> Result<Self> {
326        let meta_key = format!("_blob:meta:{artifact_id}");
327        let tensor = store
328            .get(&meta_key)
329            .map_err(|_| BlobError::NotFound(artifact_id.to_string()))?;
330
331        let chunks = get_pointers(&tensor, "_chunks")
332            .ok_or_else(|| BlobError::NotFound(format!("chunks for {artifact_id}")))?;
333        let total_size = usize::try_from(get_int(&tensor, "_size").unwrap_or(0)).unwrap_or(0);
334        let checksum = get_string(&tensor, "_checksum").unwrap_or_default();
335
336        Ok(Self {
337            store,
338            chunks,
339            current_chunk: 0,
340            current_data: None,
341            current_offset: 0,
342            total_size,
343            bytes_read: 0,
344            checksum,
345        })
346    }
347
348    /// Read the next chunk. Returns None when all chunks have been read.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if a chunk is missing.
353    #[allow(clippy::unused_async)]
354    pub async fn next_chunk(&mut self) -> Result<Option<Vec<u8>>> {
355        if self.current_chunk >= self.chunks.len() {
356            return Ok(None);
357        }
358
359        let chunk_key = &self.chunks[self.current_chunk];
360        let tensor = self
361            .store
362            .get(chunk_key)
363            .map_err(|_| BlobError::ChunkMissing(chunk_key.clone()))?;
364
365        let data = get_bytes(&tensor, "_data")
366            .ok_or_else(|| BlobError::ChunkMissing(chunk_key.clone()))?;
367
368        self.current_chunk += 1;
369        self.bytes_read += data.len();
370
371        Ok(Some(data))
372    }
373
374    /// Read all remaining data into a single buffer.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if a chunk is missing.
379    pub async fn read_all(&mut self) -> Result<Vec<u8>> {
380        let mut result = Vec::with_capacity(self.total_size);
381
382        while let Some(chunk) = self.next_chunk().await? {
383            result.extend(chunk);
384        }
385
386        Ok(result)
387    }
388
389    /// Read into a buffer, returning number of bytes read.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if a chunk is missing.
394    ///
395    /// # Panics
396    ///
397    /// This method will not panic under normal conditions. The internal unwrap
398    /// is guarded by the preceding chunk load logic.
399    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
400        // Load chunk if needed
401        if self.current_data.is_none()
402            || self.current_offset >= self.current_data.as_ref().map_or(0, Vec::len)
403        {
404            match self.next_chunk().await? {
405                Some(data) => {
406                    self.current_data = Some(data);
407                    self.current_offset = 0;
408                },
409                None => return Ok(0),
410            }
411        }
412
413        let data = self.current_data.as_ref().unwrap();
414        let remaining = &data[self.current_offset..];
415        let to_copy = remaining.len().min(buf.len());
416
417        buf[..to_copy].copy_from_slice(&remaining[..to_copy]);
418        self.current_offset += to_copy;
419
420        Ok(to_copy)
421    }
422
423    /// Verify the artifact checksum.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if a chunk is missing.
428    pub async fn verify(&mut self) -> Result<bool> {
429        let mut hasher = StreamingHasher::new();
430
431        // Reset to start
432        self.current_chunk = 0;
433        self.bytes_read = 0;
434
435        while let Some(chunk) = self.next_chunk().await? {
436            hasher.update(&chunk);
437        }
438
439        let actual = hasher.finalize();
440        Ok(actual == self.checksum)
441    }
442
443    /// Get the expected checksum.
444    #[must_use]
445    pub fn checksum(&self) -> &str {
446        &self.checksum
447    }
448
449    /// Get total artifact size.
450    #[must_use]
451    pub const fn total_size(&self) -> usize {
452        self.total_size
453    }
454
455    /// Get bytes read so far.
456    #[must_use]
457    pub const fn bytes_read(&self) -> usize {
458        self.bytes_read
459    }
460
461    /// Get the number of chunks.
462    #[must_use]
463    #[allow(clippy::missing_const_for_fn)]
464    pub fn chunk_count(&self) -> usize {
465        self.chunks.len()
466    }
467}
468
469// Helper functions
470
471fn current_timestamp() -> u64 {
472    SystemTime::now()
473        .duration_since(UNIX_EPOCH)
474        .map(|d| d.as_secs())
475        .unwrap_or(0)
476}
477
478pub fn get_int(tensor: &TensorData, field: &str) -> Option<i64> {
479    match tensor.get(field) {
480        Some(TensorValue::Scalar(ScalarValue::Int(i))) => Some(*i),
481        _ => None,
482    }
483}
484
485pub fn get_string(tensor: &TensorData, field: &str) -> Option<String> {
486    match tensor.get(field) {
487        Some(TensorValue::Scalar(ScalarValue::String(s))) => Some(s.clone()),
488        _ => None,
489    }
490}
491
492pub fn get_bytes(tensor: &TensorData, field: &str) -> Option<Vec<u8>> {
493    match tensor.get(field) {
494        Some(TensorValue::Scalar(ScalarValue::Bytes(b))) => Some(b.clone()),
495        _ => None,
496    }
497}
498
499pub fn get_pointers(tensor: &TensorData, field: &str) -> Option<Vec<String>> {
500    match tensor.get(field) {
501        Some(TensorValue::Pointers(p)) => Some(p.clone()),
502        _ => None,
503    }
504}
505
506#[cfg(feature = "vector")]
507pub fn get_vector(tensor: &TensorData, field: &str) -> Option<Vec<f32>> {
508    match tensor.get(field) {
509        Some(TensorValue::Vector(v)) => Some(v.clone()),
510        Some(TensorValue::Sparse(s)) => Some(s.to_dense()),
511        _ => None,
512    }
513}
514
515/// Check if a vector should use sparse storage (50% threshold).
516pub fn should_use_sparse(vector: &[f32]) -> bool {
517    if vector.is_empty() {
518        return false;
519    }
520    let nnz = vector.iter().filter(|&&v| v.abs() > 1e-6).count();
521    // For 0.5 threshold: sparse if nnz <= len/2, i.e., nnz*2 <= len
522    nnz * 2 <= vector.len()
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::metadata::PutOptions;
529
530    fn create_test_store() -> TensorStore {
531        TensorStore::new()
532    }
533
534    #[tokio::test]
535    async fn test_blob_writer_small_file() {
536        let store = create_test_store();
537        let mut writer = BlobWriter::new(
538            store.clone(),
539            1024,
540            "test-artifact".to_string(),
541            "test.txt".to_string(),
542            PutOptions::default(),
543            "text/plain",
544        );
545
546        writer.write(b"hello world").await.unwrap();
547        let artifact_id = writer.finish().await.unwrap();
548
549        assert_eq!(artifact_id, "test-artifact");
550
551        // Verify metadata was stored
552        let meta_key = format!("_blob:meta:{artifact_id}");
553        let tensor = store.get(&meta_key).unwrap();
554        assert_eq!(
555            get_string(&tensor, "_filename"),
556            Some("test.txt".to_string())
557        );
558        assert_eq!(get_int(&tensor, "_size"), Some(11));
559        assert_eq!(get_int(&tensor, "_chunk_count"), Some(1));
560    }
561
562    #[tokio::test]
563    async fn test_blob_writer_multi_chunk() {
564        let store = create_test_store();
565        let chunk_size = 10;
566        let mut writer = BlobWriter::new(
567            store.clone(),
568            chunk_size,
569            "multi-chunk".to_string(),
570            "data.bin".to_string(),
571            PutOptions::default(),
572            "application/octet-stream",
573        );
574
575        // Write 25 bytes = 3 chunks (10 + 10 + 5)
576        writer.write(&[0u8; 25]).await.unwrap();
577        let artifact_id = writer.finish().await.unwrap();
578
579        let meta_key = format!("_blob:meta:{artifact_id}");
580        let tensor = store.get(&meta_key).unwrap();
581        assert_eq!(get_int(&tensor, "_chunk_count"), Some(3));
582        assert_eq!(get_int(&tensor, "_size"), Some(25));
583    }
584
585    #[tokio::test]
586    async fn test_blob_writer_incremental_write() {
587        let store = create_test_store();
588        let mut writer = BlobWriter::new(
589            store.clone(),
590            10,
591            "incremental".to_string(),
592            "data.bin".to_string(),
593            PutOptions::default(),
594            "application/octet-stream",
595        );
596
597        // Write in small increments
598        writer.write(&[1, 2, 3]).await.unwrap();
599        writer.write(&[4, 5, 6]).await.unwrap();
600        writer.write(&[7, 8, 9, 10, 11, 12]).await.unwrap();
601
602        let artifact_id = writer.finish().await.unwrap();
603
604        let meta_key = format!("_blob:meta:{artifact_id}");
605        let tensor = store.get(&meta_key).unwrap();
606        assert_eq!(get_int(&tensor, "_size"), Some(12));
607    }
608
609    #[tokio::test]
610    async fn test_blob_writer_with_options() {
611        let store = create_test_store();
612        let options = PutOptions::new()
613            .with_content_type("application/pdf")
614            .with_created_by("user:alice")
615            .with_link("task:123")
616            .with_tag("quarterly")
617            .with_meta("author", "Alice");
618
619        let mut writer = BlobWriter::new(
620            store.clone(),
621            1024,
622            "with-options".to_string(),
623            "report.pdf".to_string(),
624            options,
625            "application/octet-stream",
626        );
627
628        writer.write(b"PDF content").await.unwrap();
629        let artifact_id = writer.finish().await.unwrap();
630
631        let meta_key = format!("_blob:meta:{artifact_id}");
632        let tensor = store.get(&meta_key).unwrap();
633        assert_eq!(
634            get_string(&tensor, "_content_type"),
635            Some("application/pdf".to_string())
636        );
637        assert_eq!(
638            get_string(&tensor, "_created_by"),
639            Some("user:alice".to_string())
640        );
641        assert_eq!(
642            get_string(&tensor, "_meta:author"),
643            Some("Alice".to_string())
644        );
645    }
646
647    #[tokio::test]
648    async fn test_blob_reader_small_file() {
649        let store = create_test_store();
650
651        // First write
652        let mut writer = BlobWriter::new(
653            store.clone(),
654            1024,
655            "read-test".to_string(),
656            "test.txt".to_string(),
657            PutOptions::default(),
658            "text/plain",
659        );
660        writer.write(b"hello world").await.unwrap();
661        writer.finish().await.unwrap();
662
663        // Then read
664        let mut reader = BlobReader::new(store, "read-test").unwrap();
665        let data = reader.read_all().await.unwrap();
666
667        assert_eq!(data, b"hello world");
668        assert_eq!(reader.bytes_read(), 11);
669    }
670
671    #[tokio::test]
672    async fn test_blob_reader_multi_chunk() {
673        let store = create_test_store();
674        let data = vec![42u8; 25];
675
676        // Write
677        let mut writer = BlobWriter::new(
678            store.clone(),
679            10,
680            "multi-read".to_string(),
681            "data.bin".to_string(),
682            PutOptions::default(),
683            "application/octet-stream",
684        );
685        writer.write(&data).await.unwrap();
686        writer.finish().await.unwrap();
687
688        // Read
689        let mut reader = BlobReader::new(store, "multi-read").unwrap();
690        let result = reader.read_all().await.unwrap();
691
692        assert_eq!(result, data);
693        assert_eq!(reader.chunk_count(), 3);
694    }
695
696    #[tokio::test]
697    async fn test_blob_reader_chunk_by_chunk() {
698        let store = create_test_store();
699
700        // Write 30 bytes in 10-byte chunks
701        let mut writer = BlobWriter::new(
702            store.clone(),
703            10,
704            "chunk-read".to_string(),
705            "data.bin".to_string(),
706            PutOptions::default(),
707            "application/octet-stream",
708        );
709        writer.write(&[1u8; 30]).await.unwrap();
710        writer.finish().await.unwrap();
711
712        // Read chunk by chunk
713        let mut reader = BlobReader::new(store, "chunk-read").unwrap();
714        let chunk1 = reader.next_chunk().await.unwrap().unwrap();
715        let chunk2 = reader.next_chunk().await.unwrap().unwrap();
716        let chunk3 = reader.next_chunk().await.unwrap().unwrap();
717        let chunk4 = reader.next_chunk().await.unwrap();
718
719        assert_eq!(chunk1.len(), 10);
720        assert_eq!(chunk2.len(), 10);
721        assert_eq!(chunk3.len(), 10);
722        assert!(chunk4.is_none());
723    }
724
725    #[tokio::test]
726    async fn test_blob_reader_verify() {
727        let store = create_test_store();
728        let data = b"verification test data";
729
730        // Write
731        let mut writer = BlobWriter::new(
732            store.clone(),
733            1024,
734            "verify-test".to_string(),
735            "test.txt".to_string(),
736            PutOptions::default(),
737            "text/plain",
738        );
739        writer.write(data).await.unwrap();
740        writer.finish().await.unwrap();
741
742        // Verify
743        let mut reader = BlobReader::new(store, "verify-test").unwrap();
744        let valid = reader.verify().await.unwrap();
745        assert!(valid);
746    }
747
748    #[tokio::test]
749    async fn test_blob_reader_not_found() {
750        let store = create_test_store();
751        let result = BlobReader::new(store, "nonexistent");
752        assert!(matches!(result, Err(BlobError::NotFound(_))));
753    }
754
755    #[tokio::test]
756    async fn test_deduplication() {
757        let store = create_test_store();
758        let data = vec![42u8; 10];
759
760        // Write same data twice
761        let mut writer1 = BlobWriter::new(
762            store.clone(),
763            10,
764            "dedup-1".to_string(),
765            "file1.bin".to_string(),
766            PutOptions::default(),
767            "application/octet-stream",
768        );
769        writer1.write(&data).await.unwrap();
770        writer1.finish().await.unwrap();
771
772        let mut writer2 = BlobWriter::new(
773            store.clone(),
774            10,
775            "dedup-2".to_string(),
776            "file2.bin".to_string(),
777            PutOptions::default(),
778            "application/octet-stream",
779        );
780        writer2.write(&data).await.unwrap();
781        writer2.finish().await.unwrap();
782
783        // Count chunks - should only be 1 due to deduplication
784        let chunk_count = store.scan("_blob:chunk:").len();
785        assert_eq!(chunk_count, 1);
786
787        // But the chunk should have ref count of 2
788        let chunks = store.scan("_blob:chunk:");
789        let chunk = store.get(&chunks[0]).unwrap();
790        assert_eq!(get_int(&chunk, "_refs"), Some(2));
791    }
792}