use crate::config::OutputConfig;
use crate::error::{PackagerError, PackagerResult};
use camino::{Utf8Path, Utf8PathBuf};
use std::collections::HashMap;
#[cfg(not(feature = "s3"))]
use tracing::warn;
use tracing::{debug, info};
pub struct OutputStructure {
pub root: Utf8PathBuf,
pub variants: HashMap<String, Utf8PathBuf>,
pub manifests: Utf8PathBuf,
}
impl OutputStructure {
#[must_use]
pub fn new(root: Utf8PathBuf) -> Self {
let manifests = root.join("manifests");
Self {
root,
variants: HashMap::new(),
manifests,
}
}
pub fn add_variant(&mut self, name: &str, bitrate: u32) -> Utf8PathBuf {
let variant_dir = self.root.join(format!("{name}_{bitrate}"));
self.variants.insert(name.to_string(), variant_dir.clone());
variant_dir
}
#[must_use]
pub fn get_variant(&self, name: &str) -> Option<&Utf8PathBuf> {
self.variants.get(name)
}
pub async fn create_directories(&self) -> PackagerResult<()> {
tokio::fs::create_dir_all(&self.root).await?;
tokio::fs::create_dir_all(&self.manifests).await?;
for variant_dir in self.variants.values() {
tokio::fs::create_dir_all(variant_dir).await?;
}
info!("Created output directory structure at {}", self.root);
Ok(())
}
}
pub struct OutputManager {
config: OutputConfig,
structure: OutputStructure,
}
impl OutputManager {
pub fn new(config: OutputConfig) -> PackagerResult<Self> {
let root = Utf8PathBuf::try_from(config.directory.clone())
.map_err(|_| PackagerError::invalid_config("Invalid output directory path"))?;
let structure = OutputStructure::new(root);
Ok(Self { config, structure })
}
#[must_use]
pub fn structure(&self) -> &OutputStructure {
&self.structure
}
pub fn structure_mut(&mut self) -> &mut OutputStructure {
&mut self.structure
}
pub async fn initialize(&self) -> PackagerResult<()> {
self.structure.create_directories().await?;
Ok(())
}
pub async fn write_file(&self, relative_path: &Utf8Path, content: &[u8]) -> PackagerResult<()> {
let full_path = self.structure.root.join(relative_path);
if let Some(parent) = full_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&full_path, content).await?;
debug!("Wrote file: {} ({} bytes)", full_path, content.len());
Ok(())
}
pub async fn read_file(&self, relative_path: &Utf8Path) -> PackagerResult<Vec<u8>> {
let full_path = self.structure.root.join(relative_path);
let content = tokio::fs::read(&full_path).await?;
Ok(content)
}
pub async fn delete_file(&self, relative_path: &Utf8Path) -> PackagerResult<()> {
let full_path = self.structure.root.join(relative_path);
if full_path.exists() {
tokio::fs::remove_file(&full_path).await?;
debug!("Deleted file: {}", full_path);
}
Ok(())
}
pub async fn list_files(&self, relative_path: &Utf8Path) -> PackagerResult<Vec<Utf8PathBuf>> {
let full_path = self.structure.root.join(relative_path);
let mut files = Vec::new();
let mut entries = tokio::fs::read_dir(&full_path).await?;
while let Some(entry) = entries.next_entry().await? {
if let Ok(path) = Utf8PathBuf::try_from(entry.path()) {
files.push(path);
}
}
Ok(files)
}
pub async fn cleanup_old_segments(
&self,
variant: &str,
max_segments: usize,
) -> PackagerResult<()> {
let variant_dir = self
.structure
.get_variant(variant)
.ok_or_else(|| PackagerError::invalid_config("Variant not found"))?;
let mut segment_files = Vec::new();
let mut entries = tokio::fs::read_dir(variant_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if let Some(ext) = path.extension() {
if ext == "ts" || ext == "m4s" {
if let Ok(metadata) = entry.metadata().await {
segment_files.push((path.clone(), metadata.modified()?));
}
}
}
}
segment_files.sort_by_key(|(_, time)| *time);
if segment_files.len() > max_segments {
let to_delete = segment_files.len() - max_segments;
for (path, _) in segment_files.iter().take(to_delete) {
tokio::fs::remove_file(path).await?;
debug!("Deleted old segment: {}", path.display());
}
info!(
"Cleaned up {} old segments for variant {}",
to_delete, variant
);
}
Ok(())
}
#[cfg(feature = "s3")]
pub async fn upload_to_s3(&self, relative_path: &Utf8Path) -> PackagerResult<()> {
use aws_sdk_s3::Client;
if !self.config.s3_upload {
return Ok(());
}
let bucket =
self.config.s3_bucket.as_ref().ok_or_else(|| {
PackagerError::UploadFailed("S3 bucket not configured".to_string())
})?;
let key = if let Some(prefix) = &self.config.s3_prefix {
format!("{prefix}/{relative_path}")
} else {
relative_path.to_string()
};
let full_path = self.structure.root.join(relative_path);
let content = tokio::fs::read(&full_path).await?;
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.load()
.await;
let client = Client::new(&config);
client
.put_object()
.bucket(bucket)
.key(&key)
.body(content.into())
.send()
.await
.map_err(|e| PackagerError::UploadFailed(format!("S3 upload failed: {e}")))?;
info!("Uploaded {} to S3: s3://{}/{}", relative_path, bucket, key);
if !self.config.keep_local {
tokio::fs::remove_file(&full_path).await?;
debug!("Deleted local file after upload: {}", full_path);
}
Ok(())
}
#[cfg(not(feature = "s3"))]
pub async fn upload_to_s3(&self, _relative_path: &Utf8Path) -> PackagerResult<()> {
warn!("S3 upload requested but s3 feature is not enabled");
Ok(())
}
#[must_use]
pub fn get_full_path(&self, relative_path: &Utf8Path) -> Utf8PathBuf {
self.structure.root.join(relative_path)
}
#[must_use]
pub fn base_url(&self) -> Option<&str> {
self.config.base_url.as_deref()
}
}
#[derive(Debug, Clone, Copy)]
pub enum CleanupPolicy {
KeepAll,
MaxSegments(usize),
TimeWindow(std::time::Duration),
}
pub struct CleanupManager {
policy: CleanupPolicy,
output_manager: OutputManager,
}
impl CleanupManager {
#[must_use]
pub fn new(policy: CleanupPolicy, output_manager: OutputManager) -> Self {
Self {
policy,
output_manager,
}
}
pub async fn run_cleanup(&self, variant: &str) -> PackagerResult<()> {
match self.policy {
CleanupPolicy::KeepAll => Ok(()),
CleanupPolicy::MaxSegments(max) => {
self.output_manager.cleanup_old_segments(variant, max).await
}
CleanupPolicy::TimeWindow(duration) => {
self.cleanup_by_time_window(variant, duration).await
}
}
}
async fn cleanup_by_time_window(
&self,
variant: &str,
window: std::time::Duration,
) -> PackagerResult<()> {
let variant_dir = self
.output_manager
.structure()
.get_variant(variant)
.ok_or_else(|| PackagerError::invalid_config("Variant not found"))?;
let now = std::time::SystemTime::now();
let cutoff = now - window;
let mut entries = tokio::fs::read_dir(variant_dir).await?;
let mut deleted_count = 0;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if let Ok(metadata) = entry.metadata().await {
if let Ok(modified) = metadata.modified() {
if modified < cutoff {
tokio::fs::remove_file(&path).await?;
deleted_count += 1;
debug!("Deleted old segment: {}", path.display());
}
}
}
}
if deleted_count > 0 {
info!(
"Cleaned up {} segments outside time window for variant {}",
deleted_count, variant
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_output_structure_creation() {
let root =
Utf8PathBuf::from_path_buf(std::env::temp_dir().join("oximedia-packager-output-root"))
.expect("temp path should be valid UTF-8");
let mut structure = OutputStructure::new(root.clone());
let variant_dir = structure.add_variant("video", 1000000);
assert_eq!(variant_dir, root.join("video_1000000"));
assert_eq!(structure.get_variant("video"), Some(&variant_dir));
}
#[test]
fn test_cleanup_policy() {
let policy = CleanupPolicy::MaxSegments(10);
match policy {
CleanupPolicy::MaxSegments(max) => assert_eq!(max, 10),
_ => panic!("Wrong policy type"),
}
}
}