1use std::collections::{HashMap, HashSet};
8use std::path::{Path, PathBuf};
9use thiserror::Error;
10use ulid::Ulid;
11
12#[cfg(feature = "s3")]
13use std::sync::Arc;
14#[cfg(feature = "s3")]
15use zlayer_storage::sync::LayerSyncManager;
16#[cfg(feature = "s3")]
17use zlayer_storage::ContainerLayerId;
18
19#[derive(Error, Debug)]
20pub enum StorageError {
21 #[error("Volume '{0}' not found")]
22 VolumeNotFound(String),
23
24 #[error("Volume '{0}' is in use by containers: {1:?}")]
25 VolumeInUse(String, Vec<String>),
26
27 #[error("IO error: {0}")]
28 Io(#[from] std::io::Error),
29
30 #[error("Invalid volume name: {0}")]
31 InvalidName(String),
32
33 #[cfg(feature = "s3")]
34 #[error("Layer sync error: {0}")]
35 LayerSync(String),
36}
37
38pub type Result<T> = std::result::Result<T, StorageError>;
39
40#[derive(Debug, Clone)]
42pub struct VolumeInfo {
43 pub name: String,
45 pub path: PathBuf,
47 pub attached: HashSet<String>,
49 pub anonymous: bool,
51}
52
53#[derive(Debug, Clone)]
55pub struct S3MountInfo {
56 pub bucket: String,
58 pub prefix: Option<String>,
60 pub mount_path: PathBuf,
62 pub endpoint: Option<String>,
64 pub attached: HashSet<String>,
66}
67
68pub struct StorageManager {
70 volume_dir: PathBuf,
72 volumes: HashMap<String, VolumeInfo>,
74 s3_mounts: HashMap<String, S3MountInfo>,
76 #[cfg(feature = "s3")]
78 layer_sync: Option<Arc<LayerSyncManager>>,
79 #[cfg(feature = "s3")]
81 service_name: Option<String>,
82}
83
84impl StorageManager {
85 pub fn new(base_dir: impl AsRef<Path>) -> Result<Self> {
90 let volume_dir = base_dir.as_ref().to_path_buf();
91 std::fs::create_dir_all(&volume_dir)?;
92
93 Ok(Self {
94 volume_dir,
95 volumes: HashMap::new(),
96 s3_mounts: HashMap::new(),
97 #[cfg(feature = "s3")]
98 layer_sync: None,
99 #[cfg(feature = "s3")]
100 service_name: None,
101 })
102 }
103
104 #[must_use]
106 pub fn volume_dir(&self) -> &Path {
107 &self.volume_dir
108 }
109
110 #[cfg(feature = "s3")]
116 pub fn set_layer_sync(&mut self, sync: Arc<LayerSyncManager>, service_name: impl Into<String>) {
117 self.layer_sync = Some(sync);
118 self.service_name = Some(service_name.into());
119 }
120
121 #[cfg(feature = "s3")]
123 #[must_use]
124 pub fn layer_sync(&self) -> Option<&Arc<LayerSyncManager>> {
125 self.layer_sync.as_ref()
126 }
127
128 #[cfg(feature = "s3")]
133 fn volume_layer_id(&self, volume_name: &str) -> ContainerLayerId {
134 let service = self
135 .service_name
136 .as_deref()
137 .unwrap_or("default")
138 .to_string();
139 ContainerLayerId::new(service, format!("vol-{volume_name}"))
140 }
141
142 #[cfg(feature = "s3")]
148 async fn register_and_restore_volume(&self, volume_name: &str, volume_path: &Path) {
149 let Some(sync) = &self.layer_sync else {
150 return;
151 };
152
153 let layer_id = self.volume_layer_id(volume_name);
154
155 if let Err(e) = sync.register_container(layer_id.clone()).await {
157 tracing::warn!(
158 volume = %volume_name,
159 error = %e,
160 "failed to register volume with layer sync"
161 );
162 return;
163 }
164
165 tracing::debug!(
166 volume = %volume_name,
167 layer_id = %layer_id,
168 "registered volume with layer sync"
169 );
170
171 match sync.restore_layer(&layer_id, volume_path).await {
173 Ok(snapshot) => {
174 tracing::info!(
175 volume = %volume_name,
176 digest = %snapshot.digest,
177 size = snapshot.size_bytes,
178 "restored volume from S3 backup"
179 );
180 }
181 Err(e) => {
182 let msg = e.to_string();
184 if msg.contains("not found")
185 || msg.contains("NotFound")
186 || msg.contains("No remote layer")
187 {
188 tracing::debug!(
189 volume = %volume_name,
190 "no S3 backup found for volume (first use)"
191 );
192 } else {
193 tracing::warn!(
194 volume = %volume_name,
195 error = %e,
196 "failed to restore volume from S3"
197 );
198 }
199 }
200 }
201 }
202
203 #[cfg(feature = "s3")]
213 pub async fn sync_volume(&self, volume_name: &str) -> Result<bool> {
214 let sync = self
215 .layer_sync
216 .as_ref()
217 .ok_or_else(|| StorageError::LayerSync("layer sync not configured".to_string()))?;
218
219 let volume = self
220 .volumes
221 .get(volume_name)
222 .ok_or_else(|| StorageError::VolumeNotFound(volume_name.to_string()))?;
223
224 let layer_id = self.volume_layer_id(volume_name);
225
226 match sync.sync_layer(&layer_id, &volume.path).await {
227 Ok(Some(snapshot)) => {
228 tracing::info!(
229 volume = %volume_name,
230 digest = %snapshot.digest,
231 compressed_size = snapshot.compressed_size_bytes,
232 "synced volume to S3"
233 );
234 Ok(true)
235 }
236 Ok(None) => {
237 tracing::debug!(
238 volume = %volume_name,
239 "volume unchanged, no sync needed"
240 );
241 Ok(false)
242 }
243 Err(e) => {
244 tracing::error!(
245 volume = %volume_name,
246 error = %e,
247 "failed to sync volume to S3"
248 );
249 Err(StorageError::LayerSync(format!(
250 "failed to sync volume '{volume_name}': {e}"
251 )))
252 }
253 }
254 }
255
256 #[cfg(feature = "s3")]
265 pub async fn sync_all_volumes(&self) -> Result<usize> {
266 if self.layer_sync.is_none() {
267 return Ok(0);
268 }
269
270 let volume_names: Vec<String> = self
271 .volumes
272 .values()
273 .filter(|v| !v.anonymous)
274 .map(|v| v.name.clone())
275 .collect();
276
277 let mut synced = 0;
278 for name in &volume_names {
279 match self.sync_volume(name).await {
280 Ok(true) => synced += 1,
281 Ok(false) => {}
282 Err(e) => {
283 tracing::warn!(
284 volume = %name,
285 error = %e,
286 "failed to sync volume, continuing with others"
287 );
288 }
289 }
290 }
291
292 if synced > 0 {
293 tracing::info!(
294 synced_count = synced,
295 total = volume_names.len(),
296 "volume sync complete"
297 );
298 }
299
300 Ok(synced)
301 }
302
303 pub fn create_anonymous(&mut self, container_id: &str, target: &str) -> Result<PathBuf> {
309 let ulid = Ulid::new().to_string().to_lowercase();
310 let _safe_target = target.trim_start_matches('/').replace('/', "_");
312 let name = format!("_anon_{container_id}_{ulid}");
313
314 let anon_dir = self.volume_dir.join("_anon");
315 let volume_path = anon_dir.join(format!("{container_id}-{ulid}"));
316
317 std::fs::create_dir_all(&volume_path)?;
318
319 let mut attached = HashSet::new();
320 attached.insert(container_id.to_string());
321
322 self.volumes.insert(
323 name.clone(),
324 VolumeInfo {
325 name,
326 path: volume_path.clone(),
327 attached,
328 anonymous: true,
329 },
330 );
331
332 Ok(volume_path)
333 }
334
335 pub fn cleanup_anonymous(&mut self, container_id: &str) -> Result<()> {
340 let to_remove: Vec<String> = self
342 .volumes
343 .iter()
344 .filter(|(_, v)| v.anonymous && v.attached.contains(container_id))
345 .map(|(k, _)| k.clone())
346 .collect();
347
348 for name in to_remove {
349 if let Some(volume) = self.volumes.remove(&name) {
350 if volume.path.exists() {
351 std::fs::remove_dir_all(&volume.path)?;
352 }
353 }
354 }
355
356 Ok(())
357 }
358
359 #[must_use]
361 pub fn list_anonymous(&self, container_id: &str) -> Vec<&VolumeInfo> {
362 self.volumes
363 .values()
364 .filter(|v| v.anonymous && v.attached.contains(container_id))
365 .collect()
366 }
367
368 pub fn ensure_volume(&mut self, name: &str) -> Result<PathBuf> {
373 if !Self::is_valid_name(name) {
375 return Err(StorageError::InvalidName(name.to_string()));
376 }
377
378 let volume_path = self.volume_dir.join(name);
379
380 if !self.volumes.contains_key(name) {
381 std::fs::create_dir_all(&volume_path)?;
382
383 self.volumes.insert(
384 name.to_string(),
385 VolumeInfo {
386 name: name.to_string(),
387 path: volume_path.clone(),
388 attached: HashSet::new(),
389 anonymous: false,
390 },
391 );
392 }
393
394 Ok(volume_path)
395 }
396
397 #[allow(clippy::unused_async)]
407 pub async fn ensure_volume_with_sync(&mut self, name: &str) -> Result<PathBuf> {
408 let is_new = !self.volumes.contains_key(name);
409 let path = self.ensure_volume(name)?;
410
411 #[cfg(feature = "s3")]
412 if is_new {
413 self.register_and_restore_volume(name, &path).await;
414 }
415
416 #[cfg(not(feature = "s3"))]
417 let _ = is_new; Ok(path)
420 }
421
422 pub fn attach_volume(&mut self, name: &str, container_id: &str) -> Result<()> {
427 let volume = self
428 .volumes
429 .get_mut(name)
430 .ok_or_else(|| StorageError::VolumeNotFound(name.to_string()))?;
431
432 volume.attached.insert(container_id.to_string());
433 Ok(())
434 }
435
436 pub fn detach_volume(&mut self, name: &str, container_id: &str) -> Result<()> {
441 if let Some(volume) = self.volumes.get_mut(name) {
442 volume.attached.remove(container_id);
443 }
444 Ok(())
445 }
446
447 pub fn delete_volume(&mut self, name: &str) -> Result<()> {
452 let volume = self
453 .volumes
454 .get(name)
455 .ok_or_else(|| StorageError::VolumeNotFound(name.to_string()))?;
456
457 if !volume.attached.is_empty() {
458 return Err(StorageError::VolumeInUse(
459 name.to_string(),
460 volume.attached.iter().cloned().collect(),
461 ));
462 }
463
464 let path = volume.path.clone();
465 self.volumes.remove(name);
466
467 if path.exists() {
468 std::fs::remove_dir_all(&path)?;
469 }
470
471 Ok(())
472 }
473
474 #[must_use]
476 pub fn list_volumes(&self) -> Vec<&VolumeInfo> {
477 self.volumes.values().collect()
478 }
479
480 #[must_use]
482 pub fn get_volume(&self, name: &str) -> Option<&VolumeInfo> {
483 self.volumes.get(name)
484 }
485
486 fn s3_mount_dir(&self) -> PathBuf {
488 self.volume_dir.join("s3mounts")
489 }
490
491 fn s3_mount_key(bucket: &str, prefix: Option<&str>) -> String {
493 match prefix {
494 Some(p) => format!("{}_{}", bucket, p.replace('/', "_")),
495 None => bucket.to_string(),
496 }
497 }
498
499 pub fn mount_s3(
507 &mut self,
508 bucket: &str,
509 prefix: Option<&str>,
510 endpoint: Option<&str>,
511 container_id: &str,
512 ) -> Result<PathBuf> {
513 let key = Self::s3_mount_key(bucket, prefix);
514
515 if let Some(info) = self.s3_mounts.get_mut(&key) {
517 info.attached.insert(container_id.to_string());
518 return Ok(info.mount_path.clone());
519 }
520
521 let mount_dir = self.s3_mount_dir();
523 std::fs::create_dir_all(&mount_dir)?;
524
525 let mount_path = mount_dir.join(&key);
526 std::fs::create_dir_all(&mount_path)?;
527
528 let mut cmd = std::process::Command::new("s3fs");
530
531 let bucket_arg = match prefix {
533 Some(p) => format!("{}:/{}", bucket, p.trim_start_matches('/')),
534 None => bucket.to_string(),
535 };
536 cmd.arg(&bucket_arg);
537 cmd.arg(&mount_path);
538
539 let mut options = vec!["allow_other".to_string(), "mp_umask=022".to_string()];
541
542 if let Some(ep) = endpoint {
543 options.push(format!("url={ep}"));
544 options.push("use_path_request_style".to_string());
545 }
546
547 cmd.arg("-o");
548 cmd.arg(options.join(","));
549
550 tracing::info!(
551 bucket = %bucket,
552 prefix = ?prefix,
553 mount_path = %mount_path.display(),
554 "mounting S3 bucket via s3fs"
555 );
556
557 let output = cmd.output().map_err(|e| {
559 StorageError::Io(std::io::Error::other(format!(
560 "failed to execute s3fs: {e}"
561 )))
562 })?;
563
564 if !output.status.success() {
565 let stderr = String::from_utf8_lossy(&output.stderr);
566 return Err(StorageError::Io(std::io::Error::other(format!(
567 "s3fs mount failed: {stderr}"
568 ))));
569 }
570
571 let mut attached = HashSet::new();
573 attached.insert(container_id.to_string());
574
575 self.s3_mounts.insert(
576 key,
577 S3MountInfo {
578 bucket: bucket.to_string(),
579 prefix: prefix.map(String::from),
580 mount_path: mount_path.clone(),
581 endpoint: endpoint.map(String::from),
582 attached,
583 },
584 );
585
586 Ok(mount_path)
587 }
588
589 pub fn unmount_s3(
594 &mut self,
595 bucket: &str,
596 prefix: Option<&str>,
597 container_id: &str,
598 ) -> Result<()> {
599 let key = Self::s3_mount_key(bucket, prefix);
600
601 let should_unmount = if let Some(info) = self.s3_mounts.get_mut(&key) {
602 info.attached.remove(container_id);
603 info.attached.is_empty()
604 } else {
605 return Ok(()); };
607
608 if should_unmount {
609 if let Some(info) = self.s3_mounts.remove(&key) {
610 let output = std::process::Command::new("fusermount")
612 .arg("-u")
613 .arg(&info.mount_path)
614 .output();
615
616 match output {
617 Ok(o) if !o.status.success() => {
618 let stderr = String::from_utf8_lossy(&o.stderr);
619 tracing::warn!(
620 bucket = %bucket,
621 error = %stderr,
622 "failed to unmount S3, attempting lazy unmount"
623 );
624 let _ = std::process::Command::new("fusermount")
626 .arg("-uz")
627 .arg(&info.mount_path)
628 .output();
629 }
630 Err(e) => {
631 tracing::warn!(bucket = %bucket, error = %e, "failed to execute fusermount");
632 }
633 _ => {}
634 }
635
636 let _ = std::fs::remove_dir(&info.mount_path);
638
639 tracing::info!(bucket = %bucket, "S3 bucket unmounted");
640 }
641 }
642
643 Ok(())
644 }
645
646 #[must_use]
648 pub fn list_s3_mounts(&self) -> Vec<&S3MountInfo> {
649 self.s3_mounts.values().collect()
650 }
651
652 #[must_use]
654 pub fn get_s3_mount(&self, bucket: &str, prefix: Option<&str>) -> Option<&S3MountInfo> {
655 let key = Self::s3_mount_key(bucket, prefix);
656 self.s3_mounts.get(&key)
657 }
658
659 fn is_valid_name(name: &str) -> bool {
661 if name.is_empty() || name.len() > 63 {
662 return false;
663 }
664
665 let chars: Vec<char> = name.chars().collect();
666
667 if !chars.first().is_some_and(char::is_ascii_alphanumeric) {
669 return false;
670 }
671 if !chars.last().is_some_and(char::is_ascii_alphanumeric) {
672 return false;
673 }
674
675 chars
677 .iter()
678 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
679 }
680}
681
682#[cfg(test)]
683mod tests {
684 use super::*;
685 use tempfile::TempDir;
686
687 fn setup() -> (TempDir, StorageManager) {
688 let temp_dir = TempDir::new().unwrap();
689 let manager = StorageManager::new(temp_dir.path()).unwrap();
690 (temp_dir, manager)
691 }
692
693 #[test]
694 fn test_ensure_named_volume() {
695 let (_temp, mut manager) = setup();
696
697 let path = manager.ensure_volume("my-data").unwrap();
698 assert!(path.exists());
699 assert!(path.ends_with("my-data"));
700
701 let path2 = manager.ensure_volume("my-data").unwrap();
703 assert_eq!(path, path2);
704 }
705
706 #[test]
707 fn test_attach_detach_volume() {
708 let (_temp, mut manager) = setup();
709
710 manager.ensure_volume("test-vol").unwrap();
711 manager.attach_volume("test-vol", "container-1").unwrap();
712
713 let vol = manager.get_volume("test-vol").unwrap();
714 assert!(vol.attached.contains("container-1"));
715
716 manager.detach_volume("test-vol", "container-1").unwrap();
717 let vol = manager.get_volume("test-vol").unwrap();
718 assert!(!vol.attached.contains("container-1"));
719 }
720
721 #[test]
722 fn test_delete_volume_success() {
723 let (_temp, mut manager) = setup();
724
725 let path = manager.ensure_volume("deleteme").unwrap();
726 assert!(path.exists());
727
728 manager.delete_volume("deleteme").unwrap();
729 assert!(!path.exists());
730 assert!(manager.get_volume("deleteme").is_none());
731 }
732
733 #[test]
734 fn test_delete_volume_in_use_fails() {
735 let (_temp, mut manager) = setup();
736
737 manager.ensure_volume("in-use").unwrap();
738 manager.attach_volume("in-use", "container-1").unwrap();
739
740 let result = manager.delete_volume("in-use");
741 assert!(matches!(result, Err(StorageError::VolumeInUse(_, _))));
742 }
743
744 #[test]
745 fn test_create_anonymous_volume() {
746 let (_temp, mut manager) = setup();
747
748 let path = manager
749 .create_anonymous("container-1", "/app/cache")
750 .unwrap();
751 assert!(path.exists());
752
753 let anon_vols = manager.list_anonymous("container-1");
754 assert_eq!(anon_vols.len(), 1);
755 assert!(anon_vols[0].anonymous);
756 }
757
758 #[test]
759 fn test_cleanup_anonymous_volumes() {
760 let (_temp, mut manager) = setup();
761
762 let path1 = manager.create_anonymous("container-1", "/cache1").unwrap();
763 let path2 = manager.create_anonymous("container-1", "/cache2").unwrap();
764 let _path3 = manager.create_anonymous("container-2", "/other").unwrap();
765
766 assert!(path1.exists());
767 assert!(path2.exists());
768
769 manager.cleanup_anonymous("container-1").unwrap();
770
771 assert!(!path1.exists());
772 assert!(!path2.exists());
773
774 let remaining = manager.list_anonymous("container-2");
776 assert_eq!(remaining.len(), 1);
777 }
778
779 #[test]
780 fn test_volume_name_validation() {
781 let (_temp, mut manager) = setup();
782
783 assert!(manager.ensure_volume("a").is_ok());
785 assert!(manager.ensure_volume("my-volume").is_ok());
786 assert!(manager.ensure_volume("vol123").is_ok());
787 assert!(manager.ensure_volume("a1b2c3").is_ok());
788
789 assert!(matches!(
791 manager.ensure_volume("-invalid"),
792 Err(StorageError::InvalidName(_))
793 ));
794 assert!(matches!(
795 manager.ensure_volume("invalid-"),
796 Err(StorageError::InvalidName(_))
797 ));
798 assert!(matches!(
799 manager.ensure_volume("UPPERCASE"),
800 Err(StorageError::InvalidName(_))
801 ));
802 assert!(matches!(
803 manager.ensure_volume("has_underscore"),
804 Err(StorageError::InvalidName(_))
805 ));
806 assert!(matches!(
807 manager.ensure_volume(""),
808 Err(StorageError::InvalidName(_))
809 ));
810 }
811
812 #[test]
813 fn test_list_volumes() {
814 let (_temp, mut manager) = setup();
815
816 manager.ensure_volume("vol1").unwrap();
817 manager.ensure_volume("vol2").unwrap();
818 manager.ensure_volume("vol3").unwrap();
819
820 let vols = manager.list_volumes();
821 assert_eq!(vols.len(), 3);
822
823 let names: Vec<&str> = vols.iter().map(|v| v.name.as_str()).collect();
824 assert!(names.contains(&"vol1"));
825 assert!(names.contains(&"vol2"));
826 assert!(names.contains(&"vol3"));
827 }
828}