Skip to main content

cognee_storage/
storage_trait.rs

1use async_trait::async_trait;
2use std::path::PathBuf;
3use thiserror::Error;
4use tokio::fs::File;
5use tokio::io::{AsyncRead, AsyncWriteExt};
6
7#[derive(Debug, Clone, Error)]
8pub enum StorageError {
9    #[error("Not found: {0}")]
10    NotFound(String),
11
12    #[error("IO error: {0}")]
13    IoError(String),
14
15    #[error("Permission denied: {0}")]
16    PermissionDenied(String),
17
18    #[error("Invalid path: {0}")]
19    InvalidPath(String),
20}
21
22/// A writer for storing data in chunks
23/// This allows efficient streaming writes without loading entire content into memory
24pub struct StorageWriter {
25    file: File,
26    location: String,
27}
28
29impl StorageWriter {
30    pub(crate) fn new(file: File, location: String) -> Self {
31        Self { file, location }
32    }
33
34    /// Write a chunk of data to storage
35    pub async fn write_chunk(&mut self, chunk: &[u8]) -> Result<(), StorageError> {
36        self.file
37            .write_all(chunk)
38            .await
39            .map_err(|e| StorageError::IoError(format!("Failed to write chunk: {e}")))
40    }
41
42    /// Finish writing and return the storage location
43    pub async fn finish(mut self) -> Result<String, StorageError> {
44        self.file
45            .flush()
46            .await
47            .map_err(|e| StorageError::IoError(format!("Failed to flush file: {e}")))?;
48        Ok(self.location)
49    }
50}
51
52#[async_trait]
53pub trait StorageTrait: Send + Sync {
54    /// Store data at a specific path and return the storage location
55    async fn store(&self, data: &[u8], file_name: &str) -> Result<String, StorageError>;
56
57    /// Store data from an async reader (streaming) and return the storage location.
58    /// This is the object-safe version; for a generic version see [`StorageExt::store_stream`].
59    async fn store_stream_dyn(
60        &self,
61        reader: &mut (dyn AsyncRead + Unpin + Send),
62        file_name: &str,
63    ) -> Result<String, StorageError>;
64
65    /// Create a writer for chunk-based storage
66    /// Allows writing data in chunks without loading entire content into memory
67    async fn create_writer(&self, file_name: &str) -> Result<StorageWriter, StorageError>;
68
69    /// Retrieve data from storage location
70    async fn retrieve(&self, location: &str) -> Result<Vec<u8>, StorageError>;
71
72    /// Check if data exists at location
73    async fn exists(&self, location: &str) -> Result<bool, StorageError>;
74
75    /// Delete data at location
76    async fn delete(&self, location: &str) -> Result<(), StorageError>;
77
78    /// Get the full path for a location
79    fn get_full_path(&self, location: &str) -> PathBuf;
80
81    /// Return the base directory of this storage backend as a string.
82    /// Used to construct `file://` URIs for stored files.
83    /// Returns an empty string for backends that have no filesystem path (e.g. mock, S3).
84    fn base_path(&self) -> &str;
85
86    /// Initialize storage (create directories, etc.)
87    async fn initialize(&self) -> Result<(), StorageError>;
88
89    /// Remove all files from storage.
90    ///
91    /// Equivalent to Python's `get_file_storage(data_root_directory).remove_all()`.
92    /// The storage directory itself is preserved; only its contents are deleted.
93    async fn remove_all(&self) -> Result<(), StorageError>;
94}
95
96/// Extension trait providing generic convenience methods on top of [`StorageTrait`].
97/// Auto-implemented for all types that implement `StorageTrait`.
98#[async_trait]
99pub trait StorageExt: StorageTrait {
100    /// Store data from a typed async reader (streaming).
101    /// Delegates to [`StorageTrait::store_stream_dyn`].
102    async fn store_stream<R: AsyncRead + Unpin + Send>(
103        &self,
104        reader: &mut R,
105        file_name: &str,
106    ) -> Result<String, StorageError> {
107        self.store_stream_dyn(reader, file_name).await
108    }
109}
110
111impl<T: StorageTrait + ?Sized> StorageExt for T {}