arcly-stream 0.1.0

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation
//! Reference [`StorageBackend`] implementations.
//!
//! Gated behind `storage-fs`. The trait itself lives in [`crate::traits`] and
//! always compiles, so a consumer can implement their own backend (S3, GCS,
//! in-memory) without enabling this module.

use crate::traits::StorageBackend;
use crate::{Result, StreamError};
use async_trait::async_trait;
use bytes::Bytes;
use std::path::{Path, PathBuf};

/// A filesystem-backed object store rooted at a base directory.
///
/// Keys are treated as relative paths under `root`; parent directories are
/// created on `put`. `put_file` is overridden to `rename` when the source and
/// destination share a filesystem, avoiding the read-into-memory round-trip.
#[derive(Debug, Clone)]
pub struct FsStorage {
    root: PathBuf,
}

impl FsStorage {
    /// Create a backend rooted at `root`. The directory is created lazily.
    pub fn new(root: impl Into<PathBuf>) -> Self {
        Self { root: root.into() }
    }

    /// Resolve a key to an absolute path, rejecting traversal outside `root`.
    fn resolve(&self, key: &str) -> Result<PathBuf> {
        // Reject absolute keys and parent-dir escapes up front.
        if key.starts_with('/') || key.split('/').any(|seg| seg == "..") {
            return Err(StreamError::storage(format!("invalid key: {key}")));
        }
        Ok(self.root.join(key))
    }
}

#[async_trait]
impl StorageBackend for FsStorage {
    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
        let path = self.resolve(key)?;
        if let Some(parent) = path.parent() {
            tokio::fs::create_dir_all(parent)
                .await
                .map_err(|e| StreamError::storage(format!("mkdir {parent:?}: {e}")))?;
        }
        tokio::fs::write(&path, &data)
            .await
            .map_err(|e| StreamError::storage(format!("write {path:?}: {e}")))
    }

    async fn get(&self, key: &str) -> Result<Bytes> {
        let path = self.resolve(key)?;
        match tokio::fs::read(&path).await {
            Ok(data) => Ok(Bytes::from(data)),
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
                Err(StreamError::StorageNotFound(key.to_string()))
            }
            Err(e) => Err(StreamError::storage(format!("read {path:?}: {e}"))),
        }
    }

    async fn delete(&self, key: &str) -> Result<()> {
        let path = self.resolve(key)?;
        match tokio::fs::remove_file(&path).await {
            Ok(()) => Ok(()),
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
            Err(e) => Err(StreamError::storage(format!("delete {path:?}: {e}"))),
        }
    }

    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
        let mut out = Vec::new();
        let mut stack = vec![self.root.clone()];
        while let Some(dir) = stack.pop() {
            let mut entries = match tokio::fs::read_dir(&dir).await {
                Ok(e) => e,
                Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
                Err(e) => return Err(StreamError::storage(format!("readdir {dir:?}: {e}"))),
            };
            while let Some(entry) = entries
                .next_entry()
                .await
                .map_err(|e| StreamError::storage(format!("readdir entry: {e}")))?
            {
                let path = entry.path();
                if path.is_dir() {
                    stack.push(path);
                } else if let Some(rel) =
                    path.strip_prefix(&self.root).ok().and_then(|p| p.to_str())
                {
                    let key = rel.replace('\\', "/");
                    if key.starts_with(prefix) {
                        out.push(key);
                    }
                }
            }
        }
        Ok(out)
    }

    async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
        let dest = self.resolve(key)?;
        if let Some(parent) = dest.parent() {
            tokio::fs::create_dir_all(parent)
                .await
                .map_err(|e| StreamError::storage(format!("mkdir {parent:?}: {e}")))?;
        }
        // Try a cheap rename first; fall back to copy+remove across filesystems.
        match tokio::fs::rename(path, &dest).await {
            Ok(()) => Ok(()),
            Err(_) => {
                tokio::fs::copy(path, &dest)
                    .await
                    .map_err(|e| StreamError::storage(format!("copy to {dest:?}: {e}")))?;
                tokio::fs::remove_file(path).await.ok();
                Ok(())
            }
        }
    }
}