1use std::time::{SystemTime, UNIX_EPOCH};
3
4use tensor_store::{ScalarValue, TensorData, TensorStore, TensorValue};
5
6use crate::{
7 chunker::{Chunk, Chunker, StreamingHasher},
8 error::{BlobError, Result},
9 gc::increment_chunk_refs,
10 metadata::PutOptions,
11};
12
13pub struct WriteState {
15 pub artifact_id: String,
16 pub filename: String,
17 pub content_type: String,
18 pub created_by: String,
19 pub linked_to: Vec<String>,
20 pub tags: Vec<String>,
21 pub custom_metadata: std::collections::HashMap<String, String>,
22 pub embedding: Option<(Vec<f32>, String)>,
23}
24
25pub struct BlobWriter {
27 store: TensorStore,
28 chunker: Chunker,
29 state: WriteState,
30 chunks: Vec<String>,
31 total_size: usize,
32 hasher: StreamingHasher,
33 buffer: Vec<u8>,
34}
35
36impl BlobWriter {
37 pub(crate) fn new(
38 store: TensorStore,
39 chunk_size: usize,
40 artifact_id: String,
41 filename: String,
42 options: PutOptions,
43 default_content_type: &str,
44 ) -> Self {
45 Self {
46 store,
47 chunker: Chunker::new(chunk_size),
48 state: WriteState {
49 artifact_id,
50 filename,
51 content_type: options
52 .content_type
53 .unwrap_or_else(|| default_content_type.to_string()),
54 created_by: options.created_by.unwrap_or_default(),
55 linked_to: options.linked_to,
56 tags: options.tags,
57 custom_metadata: options.metadata,
58 embedding: options.embedding,
59 },
60 chunks: Vec::new(),
61 total_size: 0,
62 hasher: StreamingHasher::new(),
63 buffer: Vec::new(),
64 }
65 }
66
67 #[allow(clippy::unused_async)]
73 pub async fn write(&mut self, data: &[u8]) -> Result<()> {
74 if data.is_empty() {
75 return Ok(());
76 }
77
78 self.hasher.update(data);
80 self.total_size += data.len();
81
82 self.buffer.extend_from_slice(data);
84
85 while self.buffer.len() >= self.chunker.chunk_size() {
87 let chunk_data: Vec<u8> = self.buffer.drain(..self.chunker.chunk_size()).collect();
88 let chunk = Chunk::new(chunk_data);
89 self.store_chunk(chunk)?;
90 }
91
92 Ok(())
93 }
94
95 fn store_chunk(&mut self, chunk: Chunk) -> Result<()> {
97 let chunk_key = chunk.key();
98
99 if self.store.exists(&chunk_key) {
101 increment_chunk_refs(&self.store, &chunk_key)?;
103 } else {
104 let mut tensor = TensorData::new();
106 tensor.set(
107 "_type",
108 TensorValue::Scalar(ScalarValue::String("blob_chunk".to_string())),
109 );
110 tensor.set("_data", TensorValue::Scalar(ScalarValue::Bytes(chunk.data)));
111 tensor.set(
112 "_size",
113 TensorValue::Scalar(ScalarValue::Int(i64::try_from(chunk.size).unwrap_or(0))),
114 );
115 tensor.set("_refs", TensorValue::Scalar(ScalarValue::Int(1)));
116 tensor.set(
117 "_created",
118 TensorValue::Scalar(ScalarValue::Int(
119 i64::try_from(current_timestamp()).unwrap_or(0),
120 )),
121 );
122
123 self.store.put(&chunk_key, tensor)?;
124 }
125
126 self.chunks.push(chunk_key);
127 Ok(())
128 }
129
130 #[allow(clippy::unused_async)]
136 pub async fn finish(mut self) -> Result<String> {
137 if !self.buffer.is_empty() {
139 let chunk = Chunk::new(std::mem::take(&mut self.buffer));
140 self.store_chunk(chunk)?;
141 }
142
143 let content_type_for_idx = self.state.content_type.clone();
144 let linked_to_for_idx = self.state.linked_to.clone();
145 let tags_for_idx = self.state.tags.clone();
146
147 let checksum = self.hasher.finalize();
148 let tensor = build_metadata_tensor(
149 &mut self.state,
150 &mut self.chunks,
151 self.total_size,
152 self.chunker.chunk_size(),
153 &checksum,
154 );
155
156 let meta_key = format!("_blob:meta:{}", self.state.artifact_id);
157 self.store.put(&meta_key, tensor)?;
158
159 Self::write_secondary_indexes(
160 &self.store,
161 &self.state.artifact_id,
162 &content_type_for_idx,
163 &linked_to_for_idx,
164 &tags_for_idx,
165 )?;
166
167 Ok(self.state.artifact_id)
168 }
169
170 fn write_secondary_indexes(
172 store: &TensorStore,
173 artifact_id: &str,
174 content_type: &str,
175 linked_to: &[String],
176 tags: &[String],
177 ) -> Result<()> {
178 if !content_type.is_empty() {
179 let idx_key = format!("_blob:idx:ct:{content_type}:{artifact_id}");
180 store.put(&idx_key, TensorData::new())?;
181 }
182
183 for entity in linked_to {
184 let idx_key = format!("_blob:idx:link:{entity}:{artifact_id}");
185 store.put(&idx_key, TensorData::new())?;
186 }
187
188 for tag in tags {
189 let idx_key = format!("_blob:idx:tag:{tag}:{artifact_id}");
190 store.put(&idx_key, TensorData::new())?;
191 }
192
193 Ok(())
194 }
195
196 #[must_use]
198 pub const fn bytes_written(&self) -> usize {
199 self.total_size
200 }
201
202 #[must_use]
204 #[allow(clippy::missing_const_for_fn)]
205 pub fn chunks_written(&self) -> usize {
206 self.chunks.len()
207 }
208}
209
210fn build_metadata_tensor(
215 state: &mut WriteState,
216 chunks: &mut Vec<String>,
217 total_size: usize,
218 chunk_size: usize,
219 checksum: &str,
220) -> TensorData {
221 let now = current_timestamp();
222 let mut tensor = TensorData::new();
223
224 tensor.set(
225 "_type",
226 TensorValue::Scalar(ScalarValue::String("blob_artifact".to_string())),
227 );
228 tensor.set(
229 "_id",
230 TensorValue::Scalar(ScalarValue::String(state.artifact_id.clone())),
231 );
232 tensor.set(
233 "_filename",
234 TensorValue::Scalar(ScalarValue::String(std::mem::take(&mut state.filename))),
235 );
236 tensor.set(
237 "_content_type",
238 TensorValue::Scalar(ScalarValue::String(std::mem::take(&mut state.content_type))),
239 );
240 tensor.set(
241 "_size",
242 TensorValue::Scalar(ScalarValue::Int(i64::try_from(total_size).unwrap_or(0))),
243 );
244 tensor.set(
245 "_checksum",
246 TensorValue::Scalar(ScalarValue::String(checksum.to_string())),
247 );
248 tensor.set(
249 "_chunk_size",
250 TensorValue::Scalar(ScalarValue::Int(i64::try_from(chunk_size).unwrap_or(0))),
251 );
252 tensor.set(
253 "_chunk_count",
254 TensorValue::Scalar(ScalarValue::Int(i64::try_from(chunks.len()).unwrap_or(0))),
255 );
256 tensor.set("_chunks", TensorValue::Pointers(std::mem::take(chunks)));
257 tensor.set(
258 "_created",
259 TensorValue::Scalar(ScalarValue::Int(i64::try_from(now).unwrap_or(0))),
260 );
261 tensor.set(
262 "_modified",
263 TensorValue::Scalar(ScalarValue::Int(i64::try_from(now).unwrap_or(0))),
264 );
265 tensor.set(
266 "_created_by",
267 TensorValue::Scalar(ScalarValue::String(std::mem::take(&mut state.created_by))),
268 );
269
270 let linked_to = std::mem::take(&mut state.linked_to);
271 if !linked_to.is_empty() {
272 tensor.set("_linked_to", TensorValue::Pointers(linked_to));
273 }
274
275 let tags = std::mem::take(&mut state.tags);
276 if !tags.is_empty() {
277 tensor.set(
278 "_tags",
279 TensorValue::Pointers(tags.into_iter().map(|t| format!("tag:{t}")).collect()),
280 );
281 }
282
283 for (key, value) in std::mem::take(&mut state.custom_metadata) {
284 tensor.set(
285 format!("_meta:{key}"),
286 TensorValue::Scalar(ScalarValue::String(value)),
287 );
288 }
289
290 if let Some((embedding, model)) = state.embedding.take() {
291 use tensor_store::SparseVector;
292 let storage = if should_use_sparse(&embedding) {
293 TensorValue::Sparse(SparseVector::from_dense(&embedding))
294 } else {
295 TensorValue::Vector(embedding)
296 };
297 tensor.set("_embedding", storage);
298 tensor.set(
299 "_embedded_model",
300 TensorValue::Scalar(ScalarValue::String(model)),
301 );
302 }
303
304 tensor
305}
306
307pub struct BlobReader {
309 store: TensorStore,
310 chunks: Vec<String>,
311 current_chunk: usize,
312 current_data: Option<Vec<u8>>,
313 current_offset: usize,
314 total_size: usize,
315 bytes_read: usize,
316 checksum: String,
317}
318
319impl BlobReader {
320 pub(crate) fn new(store: TensorStore, artifact_id: &str) -> Result<Self> {
326 let meta_key = format!("_blob:meta:{artifact_id}");
327 let tensor = store
328 .get(&meta_key)
329 .map_err(|_| BlobError::NotFound(artifact_id.to_string()))?;
330
331 let chunks = get_pointers(&tensor, "_chunks")
332 .ok_or_else(|| BlobError::NotFound(format!("chunks for {artifact_id}")))?;
333 let total_size = usize::try_from(get_int(&tensor, "_size").unwrap_or(0)).unwrap_or(0);
334 let checksum = get_string(&tensor, "_checksum").unwrap_or_default();
335
336 Ok(Self {
337 store,
338 chunks,
339 current_chunk: 0,
340 current_data: None,
341 current_offset: 0,
342 total_size,
343 bytes_read: 0,
344 checksum,
345 })
346 }
347
348 #[allow(clippy::unused_async)]
354 pub async fn next_chunk(&mut self) -> Result<Option<Vec<u8>>> {
355 if self.current_chunk >= self.chunks.len() {
356 return Ok(None);
357 }
358
359 let chunk_key = &self.chunks[self.current_chunk];
360 let tensor = self
361 .store
362 .get(chunk_key)
363 .map_err(|_| BlobError::ChunkMissing(chunk_key.clone()))?;
364
365 let data = get_bytes(&tensor, "_data")
366 .ok_or_else(|| BlobError::ChunkMissing(chunk_key.clone()))?;
367
368 self.current_chunk += 1;
369 self.bytes_read += data.len();
370
371 Ok(Some(data))
372 }
373
374 pub async fn read_all(&mut self) -> Result<Vec<u8>> {
380 let mut result = Vec::with_capacity(self.total_size);
381
382 while let Some(chunk) = self.next_chunk().await? {
383 result.extend(chunk);
384 }
385
386 Ok(result)
387 }
388
389 pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
400 if self.current_data.is_none()
402 || self.current_offset >= self.current_data.as_ref().map_or(0, Vec::len)
403 {
404 match self.next_chunk().await? {
405 Some(data) => {
406 self.current_data = Some(data);
407 self.current_offset = 0;
408 },
409 None => return Ok(0),
410 }
411 }
412
413 let data = self.current_data.as_ref().unwrap();
414 let remaining = &data[self.current_offset..];
415 let to_copy = remaining.len().min(buf.len());
416
417 buf[..to_copy].copy_from_slice(&remaining[..to_copy]);
418 self.current_offset += to_copy;
419
420 Ok(to_copy)
421 }
422
423 pub async fn verify(&mut self) -> Result<bool> {
429 let mut hasher = StreamingHasher::new();
430
431 self.current_chunk = 0;
433 self.bytes_read = 0;
434
435 while let Some(chunk) = self.next_chunk().await? {
436 hasher.update(&chunk);
437 }
438
439 let actual = hasher.finalize();
440 Ok(actual == self.checksum)
441 }
442
443 #[must_use]
445 pub fn checksum(&self) -> &str {
446 &self.checksum
447 }
448
449 #[must_use]
451 pub const fn total_size(&self) -> usize {
452 self.total_size
453 }
454
455 #[must_use]
457 pub const fn bytes_read(&self) -> usize {
458 self.bytes_read
459 }
460
461 #[must_use]
463 #[allow(clippy::missing_const_for_fn)]
464 pub fn chunk_count(&self) -> usize {
465 self.chunks.len()
466 }
467}
468
469fn current_timestamp() -> u64 {
472 SystemTime::now()
473 .duration_since(UNIX_EPOCH)
474 .map(|d| d.as_secs())
475 .unwrap_or(0)
476}
477
478pub fn get_int(tensor: &TensorData, field: &str) -> Option<i64> {
479 match tensor.get(field) {
480 Some(TensorValue::Scalar(ScalarValue::Int(i))) => Some(*i),
481 _ => None,
482 }
483}
484
485pub fn get_string(tensor: &TensorData, field: &str) -> Option<String> {
486 match tensor.get(field) {
487 Some(TensorValue::Scalar(ScalarValue::String(s))) => Some(s.clone()),
488 _ => None,
489 }
490}
491
492pub fn get_bytes(tensor: &TensorData, field: &str) -> Option<Vec<u8>> {
493 match tensor.get(field) {
494 Some(TensorValue::Scalar(ScalarValue::Bytes(b))) => Some(b.clone()),
495 _ => None,
496 }
497}
498
499pub fn get_pointers(tensor: &TensorData, field: &str) -> Option<Vec<String>> {
500 match tensor.get(field) {
501 Some(TensorValue::Pointers(p)) => Some(p.clone()),
502 _ => None,
503 }
504}
505
506#[cfg(feature = "vector")]
507pub fn get_vector(tensor: &TensorData, field: &str) -> Option<Vec<f32>> {
508 match tensor.get(field) {
509 Some(TensorValue::Vector(v)) => Some(v.clone()),
510 Some(TensorValue::Sparse(s)) => Some(s.to_dense()),
511 _ => None,
512 }
513}
514
515pub fn should_use_sparse(vector: &[f32]) -> bool {
517 if vector.is_empty() {
518 return false;
519 }
520 let nnz = vector.iter().filter(|&&v| v.abs() > 1e-6).count();
521 nnz * 2 <= vector.len()
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::metadata::PutOptions;
529
530 fn create_test_store() -> TensorStore {
531 TensorStore::new()
532 }
533
534 #[tokio::test]
535 async fn test_blob_writer_small_file() {
536 let store = create_test_store();
537 let mut writer = BlobWriter::new(
538 store.clone(),
539 1024,
540 "test-artifact".to_string(),
541 "test.txt".to_string(),
542 PutOptions::default(),
543 "text/plain",
544 );
545
546 writer.write(b"hello world").await.unwrap();
547 let artifact_id = writer.finish().await.unwrap();
548
549 assert_eq!(artifact_id, "test-artifact");
550
551 let meta_key = format!("_blob:meta:{artifact_id}");
553 let tensor = store.get(&meta_key).unwrap();
554 assert_eq!(
555 get_string(&tensor, "_filename"),
556 Some("test.txt".to_string())
557 );
558 assert_eq!(get_int(&tensor, "_size"), Some(11));
559 assert_eq!(get_int(&tensor, "_chunk_count"), Some(1));
560 }
561
562 #[tokio::test]
563 async fn test_blob_writer_multi_chunk() {
564 let store = create_test_store();
565 let chunk_size = 10;
566 let mut writer = BlobWriter::new(
567 store.clone(),
568 chunk_size,
569 "multi-chunk".to_string(),
570 "data.bin".to_string(),
571 PutOptions::default(),
572 "application/octet-stream",
573 );
574
575 writer.write(&[0u8; 25]).await.unwrap();
577 let artifact_id = writer.finish().await.unwrap();
578
579 let meta_key = format!("_blob:meta:{artifact_id}");
580 let tensor = store.get(&meta_key).unwrap();
581 assert_eq!(get_int(&tensor, "_chunk_count"), Some(3));
582 assert_eq!(get_int(&tensor, "_size"), Some(25));
583 }
584
585 #[tokio::test]
586 async fn test_blob_writer_incremental_write() {
587 let store = create_test_store();
588 let mut writer = BlobWriter::new(
589 store.clone(),
590 10,
591 "incremental".to_string(),
592 "data.bin".to_string(),
593 PutOptions::default(),
594 "application/octet-stream",
595 );
596
597 writer.write(&[1, 2, 3]).await.unwrap();
599 writer.write(&[4, 5, 6]).await.unwrap();
600 writer.write(&[7, 8, 9, 10, 11, 12]).await.unwrap();
601
602 let artifact_id = writer.finish().await.unwrap();
603
604 let meta_key = format!("_blob:meta:{artifact_id}");
605 let tensor = store.get(&meta_key).unwrap();
606 assert_eq!(get_int(&tensor, "_size"), Some(12));
607 }
608
609 #[tokio::test]
610 async fn test_blob_writer_with_options() {
611 let store = create_test_store();
612 let options = PutOptions::new()
613 .with_content_type("application/pdf")
614 .with_created_by("user:alice")
615 .with_link("task:123")
616 .with_tag("quarterly")
617 .with_meta("author", "Alice");
618
619 let mut writer = BlobWriter::new(
620 store.clone(),
621 1024,
622 "with-options".to_string(),
623 "report.pdf".to_string(),
624 options,
625 "application/octet-stream",
626 );
627
628 writer.write(b"PDF content").await.unwrap();
629 let artifact_id = writer.finish().await.unwrap();
630
631 let meta_key = format!("_blob:meta:{artifact_id}");
632 let tensor = store.get(&meta_key).unwrap();
633 assert_eq!(
634 get_string(&tensor, "_content_type"),
635 Some("application/pdf".to_string())
636 );
637 assert_eq!(
638 get_string(&tensor, "_created_by"),
639 Some("user:alice".to_string())
640 );
641 assert_eq!(
642 get_string(&tensor, "_meta:author"),
643 Some("Alice".to_string())
644 );
645 }
646
647 #[tokio::test]
648 async fn test_blob_reader_small_file() {
649 let store = create_test_store();
650
651 let mut writer = BlobWriter::new(
653 store.clone(),
654 1024,
655 "read-test".to_string(),
656 "test.txt".to_string(),
657 PutOptions::default(),
658 "text/plain",
659 );
660 writer.write(b"hello world").await.unwrap();
661 writer.finish().await.unwrap();
662
663 let mut reader = BlobReader::new(store, "read-test").unwrap();
665 let data = reader.read_all().await.unwrap();
666
667 assert_eq!(data, b"hello world");
668 assert_eq!(reader.bytes_read(), 11);
669 }
670
671 #[tokio::test]
672 async fn test_blob_reader_multi_chunk() {
673 let store = create_test_store();
674 let data = vec![42u8; 25];
675
676 let mut writer = BlobWriter::new(
678 store.clone(),
679 10,
680 "multi-read".to_string(),
681 "data.bin".to_string(),
682 PutOptions::default(),
683 "application/octet-stream",
684 );
685 writer.write(&data).await.unwrap();
686 writer.finish().await.unwrap();
687
688 let mut reader = BlobReader::new(store, "multi-read").unwrap();
690 let result = reader.read_all().await.unwrap();
691
692 assert_eq!(result, data);
693 assert_eq!(reader.chunk_count(), 3);
694 }
695
696 #[tokio::test]
697 async fn test_blob_reader_chunk_by_chunk() {
698 let store = create_test_store();
699
700 let mut writer = BlobWriter::new(
702 store.clone(),
703 10,
704 "chunk-read".to_string(),
705 "data.bin".to_string(),
706 PutOptions::default(),
707 "application/octet-stream",
708 );
709 writer.write(&[1u8; 30]).await.unwrap();
710 writer.finish().await.unwrap();
711
712 let mut reader = BlobReader::new(store, "chunk-read").unwrap();
714 let chunk1 = reader.next_chunk().await.unwrap().unwrap();
715 let chunk2 = reader.next_chunk().await.unwrap().unwrap();
716 let chunk3 = reader.next_chunk().await.unwrap().unwrap();
717 let chunk4 = reader.next_chunk().await.unwrap();
718
719 assert_eq!(chunk1.len(), 10);
720 assert_eq!(chunk2.len(), 10);
721 assert_eq!(chunk3.len(), 10);
722 assert!(chunk4.is_none());
723 }
724
725 #[tokio::test]
726 async fn test_blob_reader_verify() {
727 let store = create_test_store();
728 let data = b"verification test data";
729
730 let mut writer = BlobWriter::new(
732 store.clone(),
733 1024,
734 "verify-test".to_string(),
735 "test.txt".to_string(),
736 PutOptions::default(),
737 "text/plain",
738 );
739 writer.write(data).await.unwrap();
740 writer.finish().await.unwrap();
741
742 let mut reader = BlobReader::new(store, "verify-test").unwrap();
744 let valid = reader.verify().await.unwrap();
745 assert!(valid);
746 }
747
748 #[tokio::test]
749 async fn test_blob_reader_not_found() {
750 let store = create_test_store();
751 let result = BlobReader::new(store, "nonexistent");
752 assert!(matches!(result, Err(BlobError::NotFound(_))));
753 }
754
755 #[tokio::test]
756 async fn test_deduplication() {
757 let store = create_test_store();
758 let data = vec![42u8; 10];
759
760 let mut writer1 = BlobWriter::new(
762 store.clone(),
763 10,
764 "dedup-1".to_string(),
765 "file1.bin".to_string(),
766 PutOptions::default(),
767 "application/octet-stream",
768 );
769 writer1.write(&data).await.unwrap();
770 writer1.finish().await.unwrap();
771
772 let mut writer2 = BlobWriter::new(
773 store.clone(),
774 10,
775 "dedup-2".to_string(),
776 "file2.bin".to_string(),
777 PutOptions::default(),
778 "application/octet-stream",
779 );
780 writer2.write(&data).await.unwrap();
781 writer2.finish().await.unwrap();
782
783 let chunk_count = store.scan("_blob:chunk:").len();
785 assert_eq!(chunk_count, 1);
786
787 let chunks = store.scan("_blob:chunk:");
789 let chunk = store.get(&chunks[0]).unwrap();
790 assert_eq!(get_int(&chunk, "_refs"), Some(2));
791 }
792}