use crate::atp::cache::{AtpCache, CacheError, CacheKey};
use crate::atp::identity::IdentityError;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SeedingConfig {
pub enabled: bool,
pub max_bandwidth_bytes_per_second: Option<u64>,
pub max_storage_bytes: Option<u64>,
pub max_concurrent_connections: Option<u32>,
pub authorized_manifests: HashSet<String>,
pub priority_levels: Vec<SeedingPriority>,
pub require_explicit_grants: bool,
pub seed_root: PathBuf,
}
impl Default for SeedingConfig {
fn default() -> Self {
Self {
enabled: false, max_bandwidth_bytes_per_second: Some(10 * 1024 * 1024), max_storage_bytes: Some(1_073_741_824), max_concurrent_connections: Some(10),
authorized_manifests: HashSet::new(),
priority_levels: vec![
SeedingPriority::high(),
SeedingPriority::normal(),
SeedingPriority::low(),
],
require_explicit_grants: true, seed_root: PathBuf::from("./seeds"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SeedingPriority {
pub name: String,
pub priority: u32,
pub bandwidth_ratio: f64,
pub dedicated_storage: bool,
}
impl SeedingPriority {
#[must_use]
pub fn high() -> Self {
Self {
name: "high".to_string(),
priority: 100,
bandwidth_ratio: 0.5, dedicated_storage: true,
}
}
#[must_use]
pub fn normal() -> Self {
Self {
name: "normal".to_string(),
priority: 50,
bandwidth_ratio: 0.3, dedicated_storage: false,
}
}
#[must_use]
pub fn low() -> Self {
Self {
name: "low".to_string(),
priority: 10,
bandwidth_ratio: 0.2, dedicated_storage: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestAuthorization {
pub manifest_hash: String,
pub grant_scope: String,
pub created_at: SystemTime,
pub expires_at: Option<SystemTime>,
pub priority: String,
pub active: bool,
}
impl ManifestAuthorization {
#[must_use]
pub fn new(manifest_hash: String, grant_scope: String, priority: String) -> Self {
Self {
manifest_hash,
grant_scope,
created_at: SystemTime::now(),
expires_at: None,
priority,
active: true,
}
}
#[must_use]
pub fn is_valid(&self) -> bool {
self.active && self.expires_at.is_none_or(|exp| exp > SystemTime::now())
}
}
#[derive(Debug)]
pub struct AtpSeedingService {
config: SeedingConfig,
cache: AtpCache,
authorizations: HashMap<String, ManifestAuthorization>,
active_sessions: HashMap<String, SeedingSession>,
metrics: SeedingMetrics,
}
impl AtpSeedingService {
pub fn new(config: SeedingConfig, cache: AtpCache) -> Self {
Self {
config,
cache,
authorizations: HashMap::new(),
active_sessions: HashMap::new(),
metrics: SeedingMetrics::default(),
}
}
pub fn authorize_manifest(
&mut self,
manifest_hash: String,
grant_scope: String,
priority: String,
) -> Result<(), SeedingError> {
if !self.config.enabled {
return Err(SeedingError::SeedingDisabled);
}
if !self
.config
.priority_levels
.iter()
.any(|p| p.name == priority)
{
return Err(SeedingError::InvalidPriority(priority));
}
let authorization =
ManifestAuthorization::new(manifest_hash.clone(), grant_scope, priority);
self.authorizations
.insert(manifest_hash.clone(), authorization);
self.config.authorized_manifests.insert(manifest_hash);
Ok(())
}
pub fn revoke_manifest(&mut self, manifest_hash: &str) -> Result<(), SeedingError> {
self.authorizations.remove(manifest_hash);
self.config.authorized_manifests.remove(manifest_hash);
self.active_sessions
.retain(|_, session| session.manifest_hash != manifest_hash);
Ok(())
}
#[must_use]
pub fn is_authorized(&self, manifest_hash: &str) -> bool {
if let Some(auth) = self.authorizations.get(manifest_hash) {
auth.is_valid()
} else {
false
}
}
pub fn get_seeded_content(
&mut self,
manifest_hash: &str,
content_hash: &str,
requester_grants: &[String],
) -> Result<Option<Vec<u8>>, SeedingError> {
let auth = self
.authorizations
.get(manifest_hash)
.ok_or_else(|| SeedingError::UnauthorizedManifest(manifest_hash.to_string()))?;
if !auth.is_valid() {
return Err(SeedingError::ExpiredAuthorization(
manifest_hash.to_string(),
));
}
if !requester_grants.contains(&auth.grant_scope) {
if auth.grant_scope != "public" && auth.grant_scope != "public-read" {
return Err(SeedingError::InsufficientGrants(auth.grant_scope.clone()));
}
}
let cache_key = CacheKey::new(
manifest_hash.to_string(),
content_hash.to_string(),
Some(auth.grant_scope.clone()),
);
match self.cache.get(&cache_key) {
Ok(Some(content)) => {
self.metrics.chunks_served += 1;
self.metrics.bytes_served += content.len() as u64;
Ok(Some(content))
}
Ok(None) => Ok(None),
Err(cache_error) => Err(SeedingError::CacheError(cache_error)),
}
}
pub fn add_seeded_content(
&mut self,
manifest_hash: &str,
content_hash: &str,
content: &[u8],
) -> Result<(), SeedingError> {
let auth = self
.authorizations
.get(manifest_hash)
.ok_or_else(|| SeedingError::UnauthorizedManifest(manifest_hash.to_string()))?;
if !auth.is_valid() {
return Err(SeedingError::ExpiredAuthorization(
manifest_hash.to_string(),
));
}
let cache_key = CacheKey::new(
manifest_hash.to_string(),
content_hash.to_string(),
Some(auth.grant_scope.clone()),
);
self.cache
.put(cache_key, content)
.map_err(SeedingError::CacheError)?;
self.metrics.chunks_stored += 1;
self.metrics.bytes_stored += content.len() as u64;
Ok(())
}
pub fn start_session(
&mut self,
peer_id: String,
manifest_hash: String,
requester_grants: Vec<String>,
) -> Result<String, SeedingError> {
if !self.config.enabled {
return Err(SeedingError::SeedingDisabled);
}
if let Some(max_conn) = self.config.max_concurrent_connections {
if self.active_sessions.len() >= max_conn as usize {
return Err(SeedingError::TooManyConnections);
}
}
let auth = self
.authorizations
.get(&manifest_hash)
.ok_or_else(|| SeedingError::UnauthorizedManifest(manifest_hash.clone()))?;
if !auth.is_valid() {
return Err(SeedingError::ExpiredAuthorization(manifest_hash));
}
if !requester_grants.contains(&auth.grant_scope) {
if auth.grant_scope != "public" && auth.grant_scope != "public-read" {
return Err(SeedingError::InsufficientGrants(auth.grant_scope.clone()));
}
}
let session_id = format!(
"seed_{}_{}",
peer_id,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
);
let session = SeedingSession {
session_id: session_id.clone(),
peer_id,
manifest_hash,
started_at: SystemTime::now(),
bytes_sent: 0,
chunks_sent: 0,
priority: auth.priority.clone(),
};
self.active_sessions.insert(session_id.clone(), session);
self.metrics.sessions_started += 1;
Ok(session_id)
}
pub fn end_session(&mut self, session_id: &str) -> Result<(), SeedingError> {
if let Some(session) = self.active_sessions.remove(session_id) {
self.metrics.sessions_completed += 1;
self.metrics.total_session_duration +=
session.started_at.elapsed().unwrap_or(Duration::ZERO);
}
Ok(())
}
#[must_use]
pub const fn metrics(&self) -> &SeedingMetrics {
&self.metrics
}
#[must_use]
pub fn authorized_manifests(&self) -> Vec<String> {
self.authorizations.keys().cloned().collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SeedingSession {
pub session_id: String,
pub peer_id: String,
pub manifest_hash: String,
pub started_at: SystemTime,
pub bytes_sent: u64,
pub chunks_sent: u64,
pub priority: String,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct SeedingMetrics {
pub chunks_served: u64,
pub bytes_served: u64,
pub chunks_stored: u64,
pub bytes_stored: u64,
pub sessions_started: u64,
pub sessions_completed: u64,
pub total_session_duration: Duration,
pub authorization_failures: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum SeedingError {
#[error("Seeding is disabled")]
SeedingDisabled,
#[error("Unauthorized manifest: {0}")]
UnauthorizedManifest(String),
#[error("Expired authorization for manifest: {0}")]
ExpiredAuthorization(String),
#[error("Insufficient grants: {0}")]
InsufficientGrants(String),
#[error("Invalid priority: {0}")]
InvalidPriority(String),
#[error("Too many concurrent connections")]
TooManyConnections,
#[error("Cache error: {0}")]
CacheError(#[from] CacheError),
#[error("Identity error: {0}")]
Identity(#[from] IdentityError),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::atp::cache::{AtpCache, CacheConfig};
#[test]
fn seeding_config_defaults() {
let config = SeedingConfig::default();
assert!(!config.enabled); assert!(config.require_explicit_grants); assert_eq!(config.priority_levels.len(), 3);
}
#[test]
fn seeding_priority_configurations() {
let high = SeedingPriority::high();
let normal = SeedingPriority::normal();
let low = SeedingPriority::low();
assert!(high.priority > normal.priority);
assert!(normal.priority > low.priority);
assert!(high.bandwidth_ratio > normal.bandwidth_ratio);
assert!(normal.bandwidth_ratio > low.bandwidth_ratio);
}
#[test]
fn manifest_authorization_validity() {
let mut auth = ManifestAuthorization::new(
"manifest123".to_string(),
"scope456".to_string(),
"high".to_string(),
);
assert!(auth.is_valid());
auth.expires_at = Some(SystemTime::UNIX_EPOCH);
assert!(!auth.is_valid());
auth.expires_at = None;
auth.active = false;
assert!(!auth.is_valid());
}
#[test]
fn seeding_service_authorization() {
let config = SeedingConfig {
enabled: true,
..SeedingConfig::default()
};
let cache = AtpCache::new(CacheConfig::default());
let mut service = AtpSeedingService::new(config, cache);
assert!(!service.is_authorized("manifest123"));
let result = service.authorize_manifest(
"manifest123".to_string(),
"scope456".to_string(),
"high".to_string(),
);
assert!(result.is_ok());
assert!(service.is_authorized("manifest123"));
let result = service.revoke_manifest("manifest123");
assert!(result.is_ok());
assert!(!service.is_authorized("manifest123"));
}
#[test]
fn seeding_service_disabled_by_default() {
let config = SeedingConfig::default(); let cache = AtpCache::new(CacheConfig::default());
let mut service = AtpSeedingService::new(config, cache);
let result = service.authorize_manifest(
"manifest123".to_string(),
"scope456".to_string(),
"high".to_string(),
);
assert!(matches!(result, Err(SeedingError::SeedingDisabled)));
}
#[test]
fn seeding_session_management() {
let config = SeedingConfig {
enabled: true,
max_concurrent_connections: Some(2),
..SeedingConfig::default()
};
let cache = AtpCache::new(CacheConfig::default());
let mut service = AtpSeedingService::new(config, cache);
service
.authorize_manifest(
"manifest123".to_string(),
"scope456".to_string(),
"high".to_string(),
)
.unwrap();
let session_id = service
.start_session(
"peer1".to_string(),
"manifest123".to_string(),
vec!["scope456".to_string()],
)
.unwrap();
assert!(service.active_sessions.contains_key(&session_id));
assert_eq!(service.metrics().sessions_started, 1);
service.end_session(&session_id).unwrap();
assert!(!service.active_sessions.contains_key(&session_id));
assert_eq!(service.metrics().sessions_completed, 1);
}
#[test]
fn grant_verification_cannot_be_bypassed() {
let config = SeedingConfig {
enabled: true,
require_explicit_grants: false, ..SeedingConfig::default()
};
let cache = AtpCache::new(CacheConfig::default());
let mut service = AtpSeedingService::new(config, cache);
service
.authorize_manifest(
"manifest123".to_string(),
"private-scope".to_string(),
"high".to_string(),
)
.unwrap();
let result =
service.get_seeded_content("manifest123", "content456", &["wrong-scope".to_string()]);
assert!(matches!(result, Err(SeedingError::InsufficientGrants(_))));
let result = service.start_session(
"peer1".to_string(),
"manifest123".to_string(),
vec!["wrong-scope".to_string()],
);
assert!(matches!(result, Err(SeedingError::InsufficientGrants(_))));
service
.authorize_manifest(
"public-manifest".to_string(),
"public".to_string(),
"high".to_string(),
)
.unwrap();
let result =
service.get_seeded_content("public-manifest", "content789", &["any-scope".to_string()]);
assert!(matches!(
result,
Ok(None) | Err(SeedingError::CacheError(_))
));
}
}