polykit_cache/
storage.rs

1//! Artifact storage with directory sharding and atomic operations.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5
6use polykit_core::error::{Error, Result};
7use polykit_core::remote_cache::Artifact;
8
9/// Storage metadata for an artifact.
10#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
11pub struct StorageMetadata {
12    /// SHA-256 hash of the compressed artifact.
13    pub hash: String,
14    /// Size of the compressed artifact in bytes.
15    pub size: u64,
16    /// Timestamp when artifact was created (Unix epoch seconds).
17    pub created_at: u64,
18    /// Cache key hash.
19    pub cache_key_hash: String,
20}
21
22/// Manages artifact storage with directory sharding.
23pub struct Storage {
24    storage_root: PathBuf,
25    max_artifact_size: u64,
26}
27
28impl Storage {
29    /// Creates a new storage instance.
30    ///
31    /// # Errors
32    ///
33    /// Returns an error if the storage directory cannot be created.
34    pub fn new(storage_root: impl AsRef<Path>, max_artifact_size: u64) -> Result<Self> {
35        let storage_root = storage_root.as_ref().to_path_buf();
36        fs::create_dir_all(&storage_root).map_err(Error::Io)?;
37
38        // Create tmp directory for temporary uploads
39        let tmp_dir = storage_root.join("tmp");
40        fs::create_dir_all(&tmp_dir).map_err(Error::Io)?;
41
42        Ok(Self {
43            storage_root,
44            max_artifact_size,
45        })
46    }
47
48    /// Gets the shard directory path for a cache key.
49    ///
50    /// Uses first 4 characters of the cache key hash for sharding:
51    /// `aa/bb/` from hash `aabb...`
52    fn shard_path(&self, cache_key: &str) -> PathBuf {
53        if cache_key.len() < 4 {
54            // Fallback for very short keys
55            return self.storage_root.join("00").join("00");
56        }
57
58        let prefix = &cache_key[..4];
59        let dir1 = &prefix[..2];
60        let dir2 = &prefix[2..4];
61
62        self.storage_root.join(dir1).join(dir2)
63    }
64
65    /// Gets the artifact file path for a cache key.
66    fn artifact_path(&self, cache_key: &str) -> PathBuf {
67        self.shard_path(cache_key).join(format!("{}.zst", cache_key))
68    }
69
70    /// Gets the metadata file path for a cache key.
71    fn metadata_path(&self, cache_key: &str) -> PathBuf {
72        self.shard_path(cache_key).join(format!("{}.json", cache_key))
73    }
74
75    /// Checks if an artifact exists.
76    pub fn has_artifact(&self, cache_key: &str) -> bool {
77        self.artifact_path(cache_key).exists()
78    }
79
80    /// Gets the temporary file path for an upload.
81    fn temp_path(&self) -> PathBuf {
82        let uuid = uuid::Uuid::new_v4();
83        self.storage_root.join("tmp").join(format!("{}.tmp", uuid))
84    }
85
86    /// Stores an artifact atomically.
87    ///
88    /// # Arguments
89    ///
90    /// * `cache_key` - The cache key hash
91    /// * `data` - The compressed artifact data
92    /// * `hash` - SHA-256 hash of the data
93    /// * `artifact` - The artifact (for metadata access)
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if storage fails or artifact already exists.
98    pub async fn store_artifact(
99        &self,
100        cache_key: &str,
101        data: Vec<u8>,
102        hash: String,
103        artifact: &Artifact,
104    ) -> Result<()> {
105        // Validate cache key format (should be hex string)
106        if !cache_key.chars().all(|c| c.is_ascii_hexdigit()) {
107            return Err(Error::Adapter {
108                package: "storage".to_string(),
109                message: format!("Invalid cache key format: {}", cache_key),
110            });
111        }
112
113        // Check size limit
114        if data.len() as u64 > self.max_artifact_size {
115            return Err(Error::Adapter {
116                package: "storage".to_string(),
117                message: format!(
118                    "Artifact size {} exceeds maximum {}",
119                    data.len(),
120                    self.max_artifact_size
121                ),
122            });
123        }
124
125        // Check if artifact already exists (immutable)
126        if self.has_artifact(cache_key) {
127            return Err(Error::Adapter {
128                package: "storage".to_string(),
129                message: format!("Artifact {} already exists", cache_key),
130            });
131        }
132
133        // Write to temporary file
134        let temp_path = self.temp_path();
135        tokio::fs::write(&temp_path, &data).await.map_err(Error::Io)?;
136
137        // Determine shard directory
138        let shard_dir = self.shard_path(cache_key);
139        fs::create_dir_all(&shard_dir).map_err(Error::Io)?;
140
141        // Atomic rename
142        let artifact_path = self.artifact_path(cache_key);
143        fs::rename(&temp_path, &artifact_path).map_err(|e| {
144            // Clean up temp file on error
145            let _ = fs::remove_file(&temp_path);
146            Error::Io(e)
147        })?;
148
149        // Write metadata
150        let artifact_metadata = artifact.metadata();
151        let storage_metadata = StorageMetadata {
152            hash,
153            size: data.len() as u64,
154            created_at: artifact_metadata.created_at,
155            cache_key_hash: artifact_metadata.cache_key_hash.clone(),
156        };
157
158        let metadata_json = serde_json::to_string(&storage_metadata).map_err(|e| Error::Adapter {
159            package: "storage".to_string(),
160            message: format!("Failed to serialize metadata: {}", e),
161        })?;
162
163        let metadata_path = self.metadata_path(cache_key);
164        fs::write(&metadata_path, metadata_json).map_err(Error::Io)?;
165
166        Ok(())
167    }
168
169    /// Reads an artifact.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the artifact doesn't exist or cannot be read.
174    pub async fn read_artifact(&self, cache_key: &str) -> Result<Vec<u8>> {
175        let artifact_path = self.artifact_path(cache_key);
176
177        if !artifact_path.exists() {
178            return Err(Error::Adapter {
179                package: "storage".to_string(),
180                message: format!("Artifact {} not found", cache_key),
181            });
182        }
183
184        tokio::fs::read(&artifact_path).await.map_err(Error::Io)
185    }
186
187    /// Reads artifact metadata.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if metadata doesn't exist or cannot be read.
192    pub async fn read_metadata(&self, cache_key: &str) -> Result<StorageMetadata> {
193        let metadata_path = self.metadata_path(cache_key);
194
195        if !metadata_path.exists() {
196            return Err(Error::Adapter {
197                package: "storage".to_string(),
198                message: format!("Metadata for {} not found", cache_key),
199            });
200        }
201
202        let content = tokio::fs::read_to_string(&metadata_path)
203            .await
204            .map_err(Error::Io)?;
205
206        serde_json::from_str(&content).map_err(|e| Error::Adapter {
207            package: "storage".to_string(),
208            message: format!("Failed to parse metadata: {}", e),
209        })
210    }
211
212    /// Returns the maximum artifact size.
213    pub fn max_artifact_size(&self) -> u64 {
214        self.max_artifact_size
215    }
216
217    /// Cleans up temporary files older than the specified duration.
218    ///
219    /// This should be called periodically to clean up failed uploads.
220    pub fn cleanup_temp_files(&self) -> Result<()> {
221        let tmp_dir = self.storage_root.join("tmp");
222
223        if !tmp_dir.exists() {
224            return Ok(());
225        }
226
227        for entry in fs::read_dir(&tmp_dir).map_err(Error::Io)? {
228            let entry = entry.map_err(Error::Io)?;
229            let path = entry.path();
230
231            if path.extension().and_then(|s| s.to_str()) == Some("tmp") {
232                // Try to remove, ignore errors for concurrent access
233                let _ = fs::remove_file(&path);
234            }
235        }
236
237        Ok(())
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use tempfile::TempDir;
245
246    #[tokio::test]
247    async fn test_storage_sharding() {
248        let temp_dir = TempDir::new().unwrap();
249        let storage = Storage::new(temp_dir.path(), 1024 * 1024).unwrap();
250
251        let cache_key = "aabbccdd11223344556677889900aabbccddeeff";
252        let shard_path = storage.shard_path(cache_key);
253
254        assert!(shard_path.ends_with("aa/bb"));
255    }
256
257    #[tokio::test]
258    async fn test_storage_atomic_write() {
259        let temp_dir = TempDir::new().unwrap();
260        let storage = Storage::new(temp_dir.path(), 1024 * 1024).unwrap();
261
262        let cache_key = "aabbccdd11223344556677889900aabbccddeeff";
263        let data = b"test data".to_vec();
264        let hash = "test_hash".to_string();
265
266        let artifact = polykit_core::remote_cache::Artifact::new(
267            "test".to_string(),
268            "build".to_string(),
269            "echo".to_string(),
270            cache_key.to_string(),
271            std::collections::BTreeMap::new(),
272        )
273        .unwrap();
274
275        // Store artifact
276        storage
277            .store_artifact(cache_key, data.clone(), hash, &artifact)
278            .await
279            .unwrap();
280
281        // Verify artifact exists
282        assert!(storage.has_artifact(cache_key));
283
284        // Verify we can read it back
285        let read_data = storage.read_artifact(cache_key).await.unwrap();
286        assert_eq!(read_data, data);
287
288        // Verify metadata
289        let read_metadata = storage.read_metadata(cache_key).await.unwrap();
290        assert_eq!(read_metadata.cache_key_hash, cache_key);
291    }
292
293    #[tokio::test]
294    async fn test_storage_immutable() {
295        let temp_dir = TempDir::new().unwrap();
296        let storage = Storage::new(temp_dir.path(), 1024 * 1024).unwrap();
297
298        let cache_key = "aabbccdd11223344556677889900aabbccddeeff";
299        let data = b"test data".to_vec();
300        let hash = "test_hash".to_string();
301
302        let artifact1 = polykit_core::remote_cache::Artifact::new(
303            "test".to_string(),
304            "build".to_string(),
305            "echo".to_string(),
306            cache_key.to_string(),
307            std::collections::BTreeMap::new(),
308        )
309        .unwrap();
310
311        // Store artifact
312        storage
313            .store_artifact(cache_key, data, hash, &artifact1)
314            .await
315            .unwrap();
316
317        // Try to store again (should fail)
318        let artifact2 = polykit_core::remote_cache::Artifact::new(
319            "test".to_string(),
320            "build".to_string(),
321            "echo".to_string(),
322            cache_key.to_string(),
323            std::collections::BTreeMap::new(),
324        )
325        .unwrap();
326
327        let result = storage
328            .store_artifact(cache_key, b"different".to_vec(), "hash2".to_string(), &artifact2)
329            .await;
330
331        assert!(result.is_err());
332    }
333}