pub mod buffer_sink;
pub mod object_sink;
pub mod stream_sink;
pub mod writer;
use crate::atp::object::{ContentId, ObjectId, ObjectKind};
use crate::cx::Cx;
use crate::types::outcome::Outcome;
use sha2::{Digest, Sha256};
use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::time::{Duration, SystemTime};
pub trait AtpWriter {
type Error: std::error::Error + Send + Sync + 'static;
fn write_buffer(
&mut self,
cx: &Cx,
data: &[u8],
options: WriteOptions,
) -> impl Future<Output = Outcome<WriteResult, Self::Error>> + Send;
fn write_file(
&mut self,
cx: &Cx,
file_path: &Path,
options: WriteOptions,
) -> impl Future<Output = Outcome<WriteResult, Self::Error>> + Send;
fn write_directory(
&mut self,
cx: &Cx,
dir_path: &Path,
options: WriteOptions,
) -> impl Future<Output = Outcome<WriteResult, Self::Error>> + Send;
fn write_stream<S>(
&mut self,
cx: &Cx,
stream: S,
options: WriteOptions,
) -> impl Future<Output = Outcome<WriteResult, Self::Error>> + Send
where
S: futures::Stream<Item = Result<Vec<u8>, Self::Error>> + Send + Unpin;
fn write_object(
&mut self,
cx: &Cx,
object: impl AtpObject,
options: WriteOptions,
) -> impl Future<Output = Outcome<WriteResult, Self::Error>> + Send;
fn resume_transfer(
&mut self,
cx: &Cx,
resume_token: ResumeToken,
options: WriteOptions,
) -> impl Future<Output = Outcome<WriteResult, Self::Error>> + Send;
fn get_progress(&self, transfer_id: TransferId) -> Option<TransferProgress>;
fn cancel_transfer(
&mut self,
transfer_id: TransferId,
) -> impl Future<Output = Outcome<CancellationResult, Self::Error>> + Send;
}
pub trait AtpSink {
type Error: std::error::Error + Send + Sync + 'static;
fn start_stream(
&mut self,
cx: &Cx,
options: StreamOptions,
) -> impl Future<Output = Outcome<StreamHandle, Self::Error>> + Send;
fn write_chunk(
&mut self,
stream: &StreamHandle,
chunk: &[u8],
) -> impl Future<Output = Outcome<ChunkAck, Self::Error>> + Send;
fn finish_stream(
&mut self,
stream: StreamHandle,
) -> impl Future<Output = Outcome<WriteResult, Self::Error>> + Send;
fn backpressure_state(&self, stream: &StreamHandle) -> BackpressureState;
}
#[derive(Debug, Clone)]
pub struct WriteOptions {
pub priority: u8,
pub report_progress: bool,
pub progress_interval: Duration,
pub allow_early_consumption: bool,
pub chunking_strategy: Option<ChunkingStrategy>,
pub compression: CompressionPreference,
pub encryption: EncryptionPreference,
pub resume_behavior: ResumeBehavior,
pub proof_requirements: ProofRequirements,
pub timeout: Option<Duration>,
pub metadata: std::collections::HashMap<String, String>,
}
impl Default for WriteOptions {
fn default() -> Self {
Self {
priority: 128, report_progress: true,
progress_interval: Duration::from_secs(1),
allow_early_consumption: false,
chunking_strategy: None, compression: CompressionPreference::Auto,
encryption: EncryptionPreference::Required,
resume_behavior: ResumeBehavior::EnableResume,
proof_requirements: ProofRequirements::Standard,
timeout: None,
metadata: std::collections::HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct StreamOptions {
pub expected_size: Option<u64>,
pub max_chunk_size: usize,
pub backpressure_threshold: usize,
pub write_options: WriteOptions,
}
impl Default for StreamOptions {
fn default() -> Self {
Self {
expected_size: None,
max_chunk_size: 64 * 1024, backpressure_threshold: 10, write_options: WriteOptions::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct WriteResult {
pub transfer_id: TransferId,
pub object_id: ObjectId,
pub total_bytes: u64,
pub chunk_count: u64,
pub completed_at: SystemTime,
pub proof: TransferProof,
pub resume_token: Option<ResumeToken>,
pub verified_prefix_bytes: u64,
pub verification_status: VerificationStatus,
pub metrics: TransferMetrics,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChunkTransferProof {
pub chunk_index: u64,
pub byte_offset: u64,
pub size_bytes: u64,
pub content_hash: [u8; 32],
}
#[derive(Debug, Clone)]
pub struct TransferProof {
pub transfer_id: TransferId,
pub object_id: ObjectId,
pub content_hash: [u8; 32],
pub manifest_root: [u8; 32],
pub total_bytes: u64,
pub chunk_count: u64,
pub chunks: Vec<ChunkTransferProof>,
pub completed_at: SystemTime,
}
impl TransferProof {
#[must_use]
pub fn from_chunk_proofs(
transfer_id: TransferId,
mut chunks: Vec<ChunkTransferProof>,
completed_at: SystemTime,
) -> Self {
chunks.sort_by_key(|chunk| chunk.chunk_index);
let mut content_hasher = Sha256::new();
content_hasher.update(b"asupersync.atp.sink.transfer.content.v1\0");
content_hasher.update(transfer_id.0);
let mut manifest_hasher = Sha256::new();
manifest_hasher.update(b"asupersync.atp.sink.transfer.manifest.v1\0");
manifest_hasher.update(transfer_id.0);
let mut total_bytes = 0_u64;
for chunk in &chunks {
content_hasher.update(chunk.chunk_index.to_be_bytes());
content_hasher.update(chunk.byte_offset.to_be_bytes());
content_hasher.update(chunk.size_bytes.to_be_bytes());
content_hasher.update(chunk.content_hash);
manifest_hasher.update(chunk.chunk_index.to_be_bytes());
manifest_hasher.update(chunk.byte_offset.to_be_bytes());
manifest_hasher.update(chunk.size_bytes.to_be_bytes());
manifest_hasher.update(chunk.content_hash);
total_bytes = total_bytes.saturating_add(chunk.size_bytes);
}
let content_hash: [u8; 32] = content_hasher.finalize().into();
let manifest_root: [u8; 32] = manifest_hasher.finalize().into();
let chunk_count = chunks.len() as u64;
Self {
transfer_id,
object_id: ObjectId::content(ContentId::new(content_hash)),
content_hash,
manifest_root,
total_bytes,
chunk_count,
chunks,
completed_at,
}
}
}
#[derive(Debug, Clone)]
pub struct TransferProgress {
pub transfer_id: TransferId,
pub bytes_transferred: u64,
pub total_bytes: Option<u64>,
pub chunks_completed: u64,
pub chunks_remaining: Option<u64>,
pub transfer_rate: f64,
pub eta: Option<Duration>,
pub timestamp: SystemTime,
pub phase: TransferPhase,
pub verified_bytes: u64,
}
#[derive(Debug, Clone)]
pub struct CancellationResult {
pub transfer_id: TransferId,
pub cancelled_at: SystemTime,
pub final_state: CancellationState,
pub resume_token: Option<ResumeToken>,
pub partial_proof: Option<TransferProof>,
pub cleanup_required: Vec<CleanupAction>,
}
#[derive(Debug, Clone)]
pub struct StreamHandle {
pub stream_id: StreamId,
pub transfer_id: TransferId,
pub max_chunk_size: usize,
pub sequence_number: u64,
}
#[derive(Debug, Clone)]
pub struct ChunkAck {
pub sequence_number: u64,
pub bytes_acked: u64,
pub backpressure_level: f32, pub next_ack_eta: Option<Duration>,
}
#[derive(Debug, Clone)]
pub struct BackpressureState {
pub queue_depth: usize,
pub max_queue_depth: usize,
pub pressure_level: f32,
pub recommended_delay: Option<Duration>,
}
pub trait AtpObject: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
fn object_kind(&self) -> ObjectKind;
fn size_hint(&self) -> Option<u64>;
fn serialize_chunks(&self) -> impl Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send;
fn metadata(&self) -> std::collections::HashMap<String, String>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TransferId(pub [u8; 16]);
impl TransferId {
pub fn new() -> Self {
Self(uuid::Uuid::new_v4().into_bytes())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct StreamId(pub [u8; 16]);
impl StreamId {
pub fn new() -> Self {
Self(uuid::Uuid::new_v4().into_bytes())
}
}
#[derive(Debug, Clone)]
pub struct ResumeToken {
pub transfer_id: TransferId,
pub checkpoint_data: Vec<u8>,
pub expires_at: SystemTime,
pub required_capabilities: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChunkingStrategy {
FixedSize,
ContentDefined,
Adaptive,
ApplicationDefined,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionPreference {
None,
Auto,
Force,
Algorithm(&'static str),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EncryptionPreference {
None,
Required,
Opportunistic,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResumeBehavior {
EnableResume,
DisableResume,
ResumeOnDemand,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProofRequirements {
Standard,
Enhanced,
Minimal,
None,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferPhase {
Initializing,
Chunking,
Transferring,
Verifying,
Finalizing,
Completed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VerificationStatus {
Pending,
Verified,
Failed,
Skipped,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CancellationState {
Clean,
Resumable,
Quarantined,
PartiallyCompleted,
}
#[derive(Debug, Clone)]
pub enum CleanupAction {
RemoveTemporaryFiles(Vec<std::path::PathBuf>),
ClearCacheEntries(Vec<String>),
ReleaseResources(Vec<String>),
NotifyPeers(Vec<String>),
}
#[derive(Debug, Clone)]
pub struct TransferMetrics {
pub duration: Duration,
pub avg_transfer_rate: f64,
pub peak_transfer_rate: f64,
pub phase_durations: std::collections::HashMap<TransferPhase, Duration>,
pub round_trips: u64,
pub retransmissions: u64,
pub compression_ratio: f32,
pub deduplication_savings: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum WriteError {
#[error("Transfer cancelled by user")]
Cancelled,
#[error("Transfer timed out after {duration:?}")]
Timeout { duration: Duration },
#[error("Insufficient space: need {required} bytes, have {available} bytes")]
InsufficientSpace { required: u64, available: u64 },
#[error("Permission denied: {reason}")]
PermissionDenied { reason: String },
#[error("Network error: {source}")]
NetworkError {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Verification failed: {reason}")]
VerificationFailed { reason: String },
#[error("Resume failed: {reason}")]
ResumeFailed { reason: String },
#[error("Backpressure exceeded: {current_depth}/{max_depth}")]
BackpressureExceeded {
current_depth: usize,
max_depth: usize,
},
#[error("Invalid transfer ID: {transfer_id:?}")]
InvalidTransferId { transfer_id: TransferId },
#[error("Transfer already completed")]
AlreadyCompleted,
#[error("Transfer not found")]
TransferNotFound,
#[error("Invalid resume token")]
InvalidResumeToken,
#[error("Quota exceeded: {current}/{limit} bytes")]
QuotaExceeded { current: u64, limit: u64 },
#[error("Internal error: {message}")]
Internal { message: String },
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transfer_id_creation() {
let id1 = TransferId::new();
let id2 = TransferId::new();
assert_ne!(id1, id2);
assert_eq!(id1.0.len(), 16);
}
#[test]
fn test_write_options_default() {
let options = WriteOptions::default();
assert_eq!(options.priority, 128);
assert_eq!(options.report_progress, true);
assert_eq!(options.compression, CompressionPreference::Auto);
assert_eq!(options.encryption, EncryptionPreference::Required);
}
#[test]
fn test_stream_options_default() {
let options = StreamOptions::default();
assert_eq!(options.max_chunk_size, 64 * 1024);
assert_eq!(options.backpressure_threshold, 10);
assert_eq!(options.expected_size, None);
}
#[test]
fn test_backpressure_calculation() {
let state = BackpressureState {
queue_depth: 7,
max_queue_depth: 10,
pressure_level: 0.7,
recommended_delay: Some(Duration::from_millis(100)),
};
assert_eq!(state.pressure_level, 0.7);
assert!(state.recommended_delay.is_some());
}
}