#![expect(
dead_code,
reason = "low-level transfer endpoints are internal behind generated operation helpers"
)]
use std::collections::BTreeMap;
use std::time::Duration;
use async_nats::header::HeaderMap;
use bytes::{Bytes, BytesMut};
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tokio::sync::oneshot;
use super::request_loop::encode_error_reply;
use super::{
OperationTransferProgress, RequestContext, RequestValidator, ServerError,
ServiceResourceBindings, StoreResourceBinding, StoreResourceClient,
};
const UPLOAD_SUBJECT_PREFIX: &str = "transfer.v1.upload";
const DOWNLOAD_SUBJECT_PREFIX: &str = "transfer.v1.download";
pub const TRANSFER_SEQUENCE_HEADER: &str = "trellis-transfer-seq";
pub const TRANSFER_EOF_HEADER: &str = "trellis-transfer-eof";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct FileTransferInfo {
pub key: String,
pub size: u64,
pub updated_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub digest: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content_type: Option<String>,
pub metadata: BTreeMap<String, String>,
}
#[derive(Debug)]
pub struct TransferUploadGrantArgs<'a> {
pub service_name: &'a str,
pub session_key: &'a str,
pub service_session_key: &'a str,
pub resources: &'a ServiceResourceBindings,
pub store: &'a str,
pub key: &'a str,
pub transfer_id: &'a str,
pub expires_at: &'a str,
pub chunk_bytes: u64,
pub max_bytes: Option<u64>,
pub content_type: Option<&'a str>,
pub metadata: BTreeMap<String, String>,
}
#[derive(Debug)]
pub struct TransferDownloadGrantArgs<'a> {
pub service_name: &'a str,
pub session_key: &'a str,
pub service_session_key: &'a str,
pub resources: &'a ServiceResourceBindings,
pub store: &'a str,
pub transfer_id: &'a str,
pub expires_at: &'a str,
pub chunk_bytes: u64,
pub info: FileTransferInfo,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct UploadTransferGrant {
#[serde(rename = "type")]
pub type_name: String,
pub direction: String,
pub service: String,
pub session_key: String,
pub transfer_id: String,
pub subject: String,
pub expires_at: String,
pub chunk_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content_type: Option<String>,
pub metadata: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UploadTransferGrantPlan {
pub grant: UploadTransferGrant,
pub store_alias: String,
pub store: String,
pub key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DownloadTransferGrant {
#[serde(rename = "type")]
pub type_name: String,
pub direction: String,
pub service: String,
pub session_key: String,
pub transfer_id: String,
pub subject: String,
pub expires_at: String,
pub chunk_bytes: u64,
pub info: FileTransferInfo,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DownloadTransferGrantPlan {
pub grant: DownloadTransferGrant,
pub store_alias: String,
pub store: String,
pub(crate) max_object_bytes: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UploadTransferChunk {
pub seq: u64,
pub payload: Bytes,
pub eof: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "lowercase")]
pub enum UploadTransferAck {
Continue,
Complete {
info: FileTransferInfo,
},
}
#[derive(Debug)]
pub struct UploadTransferCompletion {
receiver: oneshot::Receiver<Result<FileTransferInfo, ServerError>>,
}
impl UploadTransferCompletion {
pub async fn completed(self) -> Result<FileTransferInfo, ServerError> {
self.receiver.await.map_err(|_| {
ServerError::Nats("upload transfer completion channel closed".to_string())
})?
}
}
#[derive(Debug, Clone)]
pub struct UploadTransferSession {
plan: UploadTransferGrantPlan,
bytes: BytesMut,
next_seq: u64,
complete: bool,
updated_at: String,
}
impl UploadTransferSession {
pub fn new(plan: UploadTransferGrantPlan, updated_at: impl Into<String>) -> Self {
Self {
plan,
bytes: BytesMut::new(),
next_seq: 0,
complete: false,
updated_at: updated_at.into(),
}
}
pub fn subject(&self) -> &str {
&self.plan.grant.subject
}
pub fn session_key(&self) -> &str {
&self.plan.grant.session_key
}
pub fn progress_for_chunk(&self, chunk: &UploadTransferChunk) -> OperationTransferProgress {
OperationTransferProgress {
chunk_index: chunk.seq,
chunk_bytes: chunk.payload.len() as u64,
transferred_bytes: self.bytes.len() as u64 + chunk.payload.len() as u64,
}
}
pub async fn receive<C>(
&mut self,
store: &C,
chunk: UploadTransferChunk,
) -> Result<UploadTransferAck, ServerError>
where
C: StoreResourceClient,
{
let now = current_time_iso()?;
self.receive_at(store, chunk, &now).await
}
pub async fn receive_at<C>(
&mut self,
store: &C,
chunk: UploadTransferChunk,
now_iso: &str,
) -> Result<UploadTransferAck, ServerError>
where
C: StoreResourceClient,
{
if self.complete {
return Err(ServerError::TransferAlreadyComplete {
transfer_id: self.plan.grant.transfer_id.clone(),
});
}
enforce_transfer_not_expired(
&self.plan.grant.transfer_id,
&self.plan.grant.expires_at,
now_iso,
)?;
if chunk.seq != self.next_seq {
return Err(ServerError::TransferSequenceOutOfOrder {
transfer_id: self.plan.grant.transfer_id.clone(),
expected_seq: self.next_seq,
actual_seq: chunk.seq,
});
}
let chunk_limit = self.plan.grant.chunk_bytes;
if chunk.payload.len() as u64 > chunk_limit {
return Err(ServerError::TransferObjectTooLarge {
service_name: self.plan.grant.service.clone(),
store: self.plan.store_alias.clone(),
key: self.plan.key.clone(),
size: chunk.payload.len() as u64,
max_bytes: chunk_limit,
});
}
let next_size = self.bytes.len() as u64 + chunk.payload.len() as u64;
enforce_upload_max_bytes(&self.plan, next_size)?;
if !chunk.eof {
self.bytes.extend_from_slice(&chunk.payload);
self.next_seq += 1;
return Ok(UploadTransferAck::Continue);
}
let mut completed_bytes = self.bytes.clone();
completed_bytes.extend_from_slice(&chunk.payload);
let info = FileTransferInfo {
key: self.plan.key.clone(),
size: completed_bytes.len() as u64,
updated_at: self.updated_at.clone(),
digest: None,
content_type: self.plan.grant.content_type.clone(),
metadata: self.plan.grant.metadata.clone(),
};
store
.write(&self.plan.key, completed_bytes.clone().freeze())
.await?;
self.bytes = completed_bytes;
self.next_seq += 1;
self.complete = true;
Ok(UploadTransferAck::Complete { info })
}
pub fn ensure_complete(&self) -> Result<(), ServerError> {
if self.complete {
Ok(())
} else {
Err(ServerError::TransferMissingEof {
transfer_id: self.plan.grant.transfer_id.clone(),
})
}
}
}
pub fn decode_upload_transfer_chunk(
headers: Option<&HeaderMap>,
payload: Bytes,
) -> Result<UploadTransferChunk, ServerError> {
let seq = required_header(headers, TRANSFER_SEQUENCE_HEADER)?;
let seq = seq
.parse::<u64>()
.map_err(|_| ServerError::InvalidTransferHeader {
header: TRANSFER_SEQUENCE_HEADER,
value: seq.to_string(),
})?;
let eof = optional_header(headers, TRANSFER_EOF_HEADER).is_some_and(|value| value == "true");
Ok(UploadTransferChunk { seq, payload, eof })
}
pub async fn run_upload_transfer_endpoint<C, V>(
client: async_nats::Client,
subscriber: impl futures_util::Stream<Item = async_nats::Message>,
session: UploadTransferSession,
store: C,
validator: V,
) -> Result<(), ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
{
run_upload_transfer_endpoint_with_progress(
client,
subscriber,
session,
store,
validator,
|_| {},
)
.await
}
pub async fn run_upload_transfer_endpoint_with_progress<C, V, F>(
client: async_nats::Client,
subscriber: impl futures_util::Stream<Item = async_nats::Message>,
session: UploadTransferSession,
store: C,
validator: V,
on_progress: F,
) -> Result<(), ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
F: Fn(OperationTransferProgress) + Send + Sync + 'static,
{
run_upload_transfer_endpoint_inner(
client,
subscriber,
session,
store,
validator,
on_progress,
None,
)
.await
}
async fn run_upload_transfer_endpoint_inner<C, V, F>(
client: async_nats::Client,
subscriber: impl futures_util::Stream<Item = async_nats::Message>,
mut session: UploadTransferSession,
store: C,
validator: V,
on_progress: F,
mut completion: Option<oneshot::Sender<Result<FileTransferInfo, ServerError>>>,
) -> Result<(), ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
F: Fn(OperationTransferProgress) + Send + Sync + 'static,
{
let mut subscriber = Box::pin(subscriber);
let expiry = tokio::time::sleep(transfer_expiry_delay(&session.plan.grant.expires_at)?);
tokio::pin!(expiry);
loop {
tokio::select! {
_ = &mut expiry => {
if let Some(sender) = completion.take() {
let _ = sender.send(Err(ServerError::TransferExpired {
transfer_id: session.plan.grant.transfer_id.clone(),
expires_at: session.plan.grant.expires_at.clone(),
}));
}
return Ok(());
}
maybe_message = subscriber.next() => {
let Some(message) = maybe_message else {
break;
};
let reply_to = message.reply.as_ref().map(ToString::to_string);
let result = handle_upload_transfer_message(&mut session, &store, &validator, &message).await;
match result {
Ok((ack, progress)) => {
match &ack {
UploadTransferAck::Continue if progress.chunk_bytes > 0 => {
on_progress(progress);
}
UploadTransferAck::Complete { info } => {
if let Some(sender) = completion.take() {
let _ = sender.send(Ok(info.clone()));
}
}
UploadTransferAck::Continue => {}
}
if let Some(reply_to) = reply_to {
client
.publish(reply_to, Bytes::from(serde_json::to_vec(&ack)?))
.await
.map_err(|error| ServerError::Nats(error.to_string()))?;
}
}
Err(error) => {
if let Some(sender) = completion.take() {
let _ = sender.send(Err(transfer_completion_error(&error)));
}
if let Some(reply_to) = reply_to {
publish_error_reply(&client, reply_to, &error).await?;
}
return Ok(());
}
}
}
}
}
if let Some(sender) = completion.take() {
let _ = sender.send(Err(ServerError::TransferMissingEof {
transfer_id: session.plan.grant.transfer_id.clone(),
}));
}
Ok(())
}
fn transfer_completion_error(error: &ServerError) -> ServerError {
match error {
ServerError::TransferSessionMismatch {
subject,
actual_session_key,
} => ServerError::TransferSessionMismatch {
subject: subject.clone(),
actual_session_key: actual_session_key.clone(),
},
ServerError::MissingSessionKey { subject } => ServerError::MissingSessionKey {
subject: subject.clone(),
},
ServerError::MissingProof { subject } => ServerError::MissingProof {
subject: subject.clone(),
},
ServerError::RequestDenied {
subject,
session_key,
} => ServerError::RequestDenied {
subject: subject.clone(),
session_key: session_key.clone(),
},
ServerError::ReplyInboxMismatch {
subject,
session_key,
reply_to,
} => ServerError::ReplyInboxMismatch {
subject: subject.clone(),
session_key: session_key.clone(),
reply_to: reply_to.clone(),
},
ServerError::TransferObjectTooLarge {
service_name,
store,
key,
size,
max_bytes,
} => ServerError::TransferObjectTooLarge {
service_name: service_name.clone(),
store: store.clone(),
key: key.clone(),
size: *size,
max_bytes: *max_bytes,
},
ServerError::TransferSequenceOutOfOrder {
transfer_id,
expected_seq,
actual_seq,
} => ServerError::TransferSequenceOutOfOrder {
transfer_id: transfer_id.clone(),
expected_seq: *expected_seq,
actual_seq: *actual_seq,
},
ServerError::TransferMissingEof { transfer_id } => ServerError::TransferMissingEof {
transfer_id: transfer_id.clone(),
},
ServerError::TransferAlreadyComplete { transfer_id } => {
ServerError::TransferAlreadyComplete {
transfer_id: transfer_id.clone(),
}
}
ServerError::InvalidTransferId { value } => ServerError::InvalidTransferId {
value: value.clone(),
},
ServerError::TransferExpired {
transfer_id,
expires_at,
} => ServerError::TransferExpired {
transfer_id: transfer_id.clone(),
expires_at: expires_at.clone(),
},
ServerError::InvalidTransferExpiry {
expires_at,
details,
} => ServerError::InvalidTransferExpiry {
expires_at: expires_at.clone(),
details: details.clone(),
},
ServerError::TransferObjectMissing { store, key } => ServerError::TransferObjectMissing {
store: store.clone(),
key: key.clone(),
},
ServerError::InvalidTransferChunkSize { chunk_bytes } => {
ServerError::InvalidTransferChunkSize {
chunk_bytes: *chunk_bytes,
}
}
ServerError::MissingTransferHeader { header } => {
ServerError::MissingTransferHeader { header }
}
ServerError::InvalidTransferHeader { header, value } => {
ServerError::InvalidTransferHeader {
header,
value: value.clone(),
}
}
ServerError::TransferObjectSizeMismatch {
store,
key,
expected_size,
actual_size,
} => ServerError::TransferObjectSizeMismatch {
store: store.clone(),
key: key.clone(),
expected_size: *expected_size,
actual_size: *actual_size,
},
_ => ServerError::Nats(error.to_string()),
}
}
pub async fn run_download_transfer_endpoint<C, V>(
client: async_nats::Client,
subscriber: impl futures_util::Stream<Item = async_nats::Message>,
plan: DownloadTransferGrantPlan,
store: C,
validator: V,
) -> Result<(), ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
{
let mut subscriber = Box::pin(subscriber);
while let Some(message) = subscriber.next().await {
let Some(reply_to) = message.reply.as_ref().map(ToString::to_string) else {
continue;
};
match handle_download_transfer_message(&plan, &store, &validator, &message).await {
Ok(chunks) => publish_download_chunks(&client, reply_to, chunks).await?,
Err(error) => publish_error_reply(&client, reply_to, &error).await?,
}
}
Ok(())
}
pub async fn spawn_upload_transfer_endpoint<C, V>(
client: async_nats::Client,
session: UploadTransferSession,
store: C,
validator: V,
) -> Result<(), ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
{
spawn_upload_transfer_endpoint_with_progress(client, session, store, validator, |_| {}).await
}
pub async fn spawn_upload_transfer_endpoint_with_progress<C, V, F>(
client: async_nats::Client,
session: UploadTransferSession,
store: C,
validator: V,
on_progress: F,
) -> Result<(), ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
F: Fn(OperationTransferProgress) + Send + Sync + 'static,
{
let subject = session.subject().to_string();
tracing::info!(subject = %subject, "subscribing upload transfer endpoint");
let subscriber = client.subscribe(subject.clone()).await.map_err(|error| {
ServerError::Nats(format!(
"failed to subscribe to upload transfer subject '{subject}': {error}"
))
})?;
client.flush().await.map_err(|error| {
ServerError::Nats(format!(
"failed to flush upload transfer subscription '{subject}': {error}"
))
})?;
tracing::debug!(subject = %subject, "upload transfer subscription flushed");
tokio::spawn(async move {
tracing::debug!(subject = %subject, "upload transfer endpoint task started");
if let Err(error) = run_upload_transfer_endpoint_with_progress(
client,
subscriber,
session,
store,
validator,
on_progress,
)
.await
{
tracing::error!(subject = %subject, error = %error, "upload transfer endpoint failed");
}
tracing::debug!(subject = %subject, "upload transfer endpoint task ended");
});
Ok(())
}
pub async fn spawn_upload_transfer_endpoint_with_completion<C, V>(
client: async_nats::Client,
session: UploadTransferSession,
store: C,
validator: V,
) -> Result<UploadTransferCompletion, ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
{
spawn_upload_transfer_endpoint_with_progress_and_completion(
client,
session,
store,
validator,
|_| {},
)
.await
}
pub async fn spawn_upload_transfer_endpoint_with_progress_and_completion<C, V, F>(
client: async_nats::Client,
session: UploadTransferSession,
store: C,
validator: V,
on_progress: F,
) -> Result<UploadTransferCompletion, ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
F: Fn(OperationTransferProgress) + Send + Sync + 'static,
{
let subject = session.subject().to_string();
tracing::info!(subject = %subject, "subscribing upload transfer endpoint");
let subscriber = client.subscribe(subject.clone()).await.map_err(|error| {
ServerError::Nats(format!(
"failed to subscribe to upload transfer subject '{subject}': {error}"
))
})?;
client.flush().await.map_err(|error| {
ServerError::Nats(format!(
"failed to flush upload transfer subscription '{subject}': {error}"
))
})?;
let (sender, receiver) = oneshot::channel();
tracing::debug!(subject = %subject, "upload transfer subscription flushed");
tokio::spawn(async move {
tracing::debug!(subject = %subject, "upload transfer endpoint task started");
if let Err(error) = run_upload_transfer_endpoint_inner(
client,
subscriber,
session,
store,
validator,
on_progress,
Some(sender),
)
.await
{
tracing::error!(subject = %subject, error = %error, "upload transfer endpoint failed");
}
tracing::debug!(subject = %subject, "upload transfer endpoint task ended");
});
Ok(UploadTransferCompletion { receiver })
}
pub async fn spawn_download_transfer_endpoint<C, V>(
client: async_nats::Client,
plan: DownloadTransferGrantPlan,
store: C,
validator: V,
) -> Result<(), ServerError>
where
C: StoreResourceClient,
V: RequestValidator + 'static,
{
let subject = plan.grant.subject.clone();
tracing::info!(subject = %subject, "subscribing download transfer endpoint");
let subscriber = client.subscribe(subject.clone()).await.map_err(|error| {
ServerError::Nats(format!(
"failed to subscribe to download transfer subject '{subject}': {error}"
))
})?;
client.flush().await.map_err(|error| {
ServerError::Nats(format!(
"failed to flush download transfer subscription '{subject}': {error}"
))
})?;
tracing::debug!(subject = %subject, "download transfer subscription flushed");
tokio::spawn(async move {
tracing::debug!(subject = %subject, "download transfer endpoint task started");
if let Err(error) =
run_download_transfer_endpoint(client, subscriber, plan, store, validator).await
{
tracing::error!(subject = %subject, error = %error, "download transfer endpoint failed");
}
tracing::debug!(subject = %subject, "download transfer endpoint task ended");
});
Ok(())
}
async fn handle_upload_transfer_message<C, V>(
session: &mut UploadTransferSession,
store: &C,
validator: &V,
message: &async_nats::Message,
) -> Result<(UploadTransferAck, OperationTransferProgress), ServerError>
where
C: StoreResourceClient,
V: RequestValidator,
{
let context = transfer_request_context(message);
validate_transfer_request(
session.subject(),
&message.payload,
&context,
session.session_key(),
validator,
)
.await?;
let chunk = decode_upload_transfer_chunk(message.headers.as_ref(), message.payload.clone())?;
let progress = session.progress_for_chunk(&chunk);
tracing::debug!(
subject = %session.subject(),
seq = chunk.seq,
bytes = chunk.payload.len(),
eof = chunk.eof,
"received upload transfer chunk"
);
let now = current_time_iso()?;
let ack = session.receive_at(store, chunk, &now).await?;
Ok((ack, progress))
}
async fn handle_download_transfer_message<C, V>(
plan: &DownloadTransferGrantPlan,
store: &C,
validator: &V,
message: &async_nats::Message,
) -> Result<Vec<DownloadTransferChunk>, ServerError>
where
C: StoreResourceClient,
V: RequestValidator,
{
let context = transfer_request_context(message);
validate_transfer_request(
&plan.grant.subject,
&message.payload,
&context,
&plan.grant.session_key,
validator,
)
.await?;
let now = current_time_iso()?;
plan_download_transfer_chunks_at(plan, store, &now).await
}
fn transfer_request_context(message: &async_nats::Message) -> RequestContext {
RequestContext {
subject: message.subject.to_string(),
session_key: optional_header(message.headers.as_ref(), "session-key")
.map(ToString::to_string),
proof: optional_header(message.headers.as_ref(), "proof").map(ToString::to_string),
iat: optional_header(message.headers.as_ref(), "iat").and_then(|value| value.parse().ok()),
request_id: optional_header(message.headers.as_ref(), "request-id")
.map(ToString::to_string),
required_capabilities: None,
reply_to: message.reply.as_ref().map(ToString::to_string),
caller: None,
traceparent: optional_header(message.headers.as_ref(), "traceparent")
.map(ToString::to_string),
tracestate: optional_header(message.headers.as_ref(), "tracestate")
.map(ToString::to_string),
}
}
async fn validate_transfer_request<V>(
subject: &str,
payload: &Bytes,
context: &RequestContext,
expected_session_key: &str,
validator: &V,
) -> Result<(), ServerError>
where
V: RequestValidator,
{
let actual_session_key =
context
.session_key
.clone()
.ok_or_else(|| ServerError::MissingSessionKey {
subject: subject.to_string(),
})?;
if context.proof.as_deref().is_none_or(str::is_empty) {
return Err(ServerError::MissingProof {
subject: subject.to_string(),
});
}
if actual_session_key != expected_session_key {
return Err(ServerError::TransferSessionMismatch {
subject: subject.to_string(),
actual_session_key,
});
}
if validator.validate(subject, payload, context).await?.allowed {
Ok(())
} else {
Err(ServerError::RequestDenied {
subject: subject.to_string(),
session_key: actual_session_key,
})
}
}
async fn publish_download_chunks(
client: &async_nats::Client,
reply_to: String,
chunks: Vec<DownloadTransferChunk>,
) -> Result<(), ServerError> {
for chunk in chunks {
let mut headers = HeaderMap::new();
headers.insert(TRANSFER_SEQUENCE_HEADER, chunk.seq.to_string().as_str());
if chunk.eof {
headers.insert(TRANSFER_EOF_HEADER, "true");
}
client
.publish_with_headers(reply_to.clone(), headers, chunk.payload)
.await
.map_err(|error| ServerError::Nats(error.to_string()))?;
}
Ok(())
}
async fn publish_error_reply(
client: &async_nats::Client,
reply_to: String,
error: &ServerError,
) -> Result<(), ServerError> {
let reply = encode_error_reply(reply_to, error);
let mut headers = HeaderMap::new();
headers.insert("status", "error");
client
.publish_with_headers(reply.reply_to, headers, reply.payload)
.await
.map_err(|error| ServerError::Nats(error.to_string()))
}
fn required_header<'a>(
headers: Option<&'a HeaderMap>,
header: &'static str,
) -> Result<&'a str, ServerError> {
optional_header(headers, header).ok_or(ServerError::MissingTransferHeader { header })
}
fn optional_header<'a>(headers: Option<&'a HeaderMap>, header: &str) -> Option<&'a str> {
headers
.and_then(|headers| headers.get(header))
.map(async_nats::header::HeaderValue::as_str)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DownloadTransferChunk {
pub seq: u64,
pub payload: Bytes,
pub eof: bool,
}
pub fn plan_upload_transfer_grant(
args: TransferUploadGrantArgs<'_>,
) -> Result<UploadTransferGrantPlan, ServerError> {
validate_chunk_bytes(args.chunk_bytes)?;
let store = store_binding(args.service_name, args.resources, args.store)?;
let max_bytes = effective_upload_max_bytes(args.max_bytes, store.max_object_bytes);
validate_transfer_id(args.transfer_id)?;
Ok(UploadTransferGrantPlan {
grant: UploadTransferGrant {
type_name: "TransferGrant".to_string(),
direction: "send".to_string(),
service: args.service_name.to_string(),
session_key: args.session_key.to_string(),
transfer_id: args.transfer_id.to_string(),
subject: transfer_subject(
UPLOAD_SUBJECT_PREFIX,
args.service_session_key,
args.transfer_id,
),
expires_at: args.expires_at.to_string(),
chunk_bytes: args.chunk_bytes,
max_bytes,
content_type: args.content_type.map(ToString::to_string),
metadata: args.metadata,
},
store_alias: args.store.to_string(),
store: store.name.clone(),
key: args.key.to_string(),
})
}
pub fn plan_download_transfer_grant(
args: TransferDownloadGrantArgs<'_>,
) -> Result<DownloadTransferGrantPlan, ServerError> {
validate_chunk_bytes(args.chunk_bytes)?;
let store = store_binding(args.service_name, args.resources, args.store)?;
enforce_max_object_bytes(
args.service_name,
args.store,
&args.info,
store.max_object_bytes,
)?;
validate_transfer_id(args.transfer_id)?;
Ok(DownloadTransferGrantPlan {
grant: DownloadTransferGrant {
type_name: "TransferGrant".to_string(),
direction: "receive".to_string(),
service: args.service_name.to_string(),
session_key: args.session_key.to_string(),
transfer_id: args.transfer_id.to_string(),
subject: transfer_subject(
DOWNLOAD_SUBJECT_PREFIX,
args.service_session_key,
args.transfer_id,
),
expires_at: args.expires_at.to_string(),
chunk_bytes: args.chunk_bytes,
info: args.info,
},
store_alias: args.store.to_string(),
store: store.name.clone(),
max_object_bytes: store
.max_object_bytes
.and_then(|value| u64::try_from(value).ok()),
})
}
pub async fn plan_download_transfer_chunks<C>(
plan: &DownloadTransferGrantPlan,
store: &C,
) -> Result<Vec<DownloadTransferChunk>, ServerError>
where
C: StoreResourceClient,
{
let now = current_time_iso()?;
plan_download_transfer_chunks_at(plan, store, &now).await
}
pub async fn plan_download_transfer_chunks_at<C>(
plan: &DownloadTransferGrantPlan,
store: &C,
now_iso: &str,
) -> Result<Vec<DownloadTransferChunk>, ServerError>
where
C: StoreResourceClient,
{
enforce_transfer_not_expired(&plan.grant.transfer_id, &plan.grant.expires_at, now_iso)?;
let key = &plan.grant.info.key;
let bytes = store
.read(key)
.await?
.ok_or_else(|| ServerError::TransferObjectMissing {
store: plan.store_alias.clone(),
key: key.clone(),
})?;
let actual_size = bytes.len() as u64;
if actual_size != plan.grant.info.size {
return Err(ServerError::TransferObjectSizeMismatch {
store: plan.store_alias.clone(),
key: key.clone(),
expected_size: plan.grant.info.size,
actual_size,
});
}
if let Some(max_bytes) = plan.max_object_bytes {
if actual_size > max_bytes {
return Err(ServerError::TransferObjectTooLarge {
service_name: plan.grant.service.clone(),
store: plan.store_alias.clone(),
key: key.clone(),
size: actual_size,
max_bytes,
});
}
}
let chunk_bytes = usize::try_from(plan.grant.chunk_bytes)
.unwrap_or(usize::MAX)
.max(1);
if bytes.is_empty() {
return Ok(vec![DownloadTransferChunk {
seq: 0,
payload: Bytes::new(),
eof: true,
}]);
}
let chunk_count = bytes.len().div_ceil(chunk_bytes);
Ok(bytes
.chunks(chunk_bytes)
.enumerate()
.map(|(index, chunk)| DownloadTransferChunk {
seq: index as u64,
payload: Bytes::copy_from_slice(chunk),
eof: index + 1 == chunk_count,
})
.collect())
}
fn store_binding<'a>(
service_name: &str,
resources: &'a ServiceResourceBindings,
store: &str,
) -> Result<&'a StoreResourceBinding, ServerError> {
resources
.store
.get(store)
.ok_or_else(|| ServerError::MissingResourceBinding {
service_name: service_name.to_string(),
resource_kind: "store".to_string(),
resource_name: store.to_string(),
})
}
fn effective_upload_max_bytes(
requested: Option<u64>,
store_max_object_bytes: Option<i64>,
) -> Option<u64> {
match (
requested,
store_max_object_bytes.and_then(|value| u64::try_from(value).ok()),
) {
(Some(requested), Some(store_max)) => Some(requested.min(store_max)),
(Some(requested), None) => Some(requested),
(None, Some(store_max)) => Some(store_max),
(None, None) => None,
}
}
fn enforce_max_object_bytes(
service_name: &str,
store: &str,
info: &FileTransferInfo,
store_max_object_bytes: Option<i64>,
) -> Result<(), ServerError> {
let Some(max_bytes) = store_max_object_bytes.and_then(|value| u64::try_from(value).ok()) else {
return Ok(());
};
if info.size > max_bytes {
return Err(ServerError::TransferObjectTooLarge {
service_name: service_name.to_string(),
store: store.to_string(),
key: info.key.clone(),
size: info.size,
max_bytes,
});
}
Ok(())
}
fn enforce_upload_max_bytes(plan: &UploadTransferGrantPlan, size: u64) -> Result<(), ServerError> {
let Some(max_bytes) = plan.grant.max_bytes else {
return Ok(());
};
if size > max_bytes {
return Err(ServerError::TransferObjectTooLarge {
service_name: plan.grant.service.clone(),
store: plan.store_alias.clone(),
key: plan.key.clone(),
size,
max_bytes,
});
}
Ok(())
}
fn validate_chunk_bytes(chunk_bytes: u64) -> Result<(), ServerError> {
if chunk_bytes == 0 {
return Err(ServerError::InvalidTransferChunkSize { chunk_bytes });
}
Ok(())
}
fn current_time_iso() -> Result<String, ServerError> {
OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|error| ServerError::InvalidTransferExpiry {
expires_at: "now".to_string(),
details: error.to_string(),
})
}
fn enforce_transfer_not_expired(
transfer_id: &str,
expires_at: &str,
now_iso: &str,
) -> Result<(), ServerError> {
let expires_at_time = parse_transfer_time(expires_at)?;
let now = parse_transfer_time(now_iso)?;
if now >= expires_at_time {
return Err(ServerError::TransferExpired {
transfer_id: transfer_id.to_string(),
expires_at: expires_at.to_string(),
});
}
Ok(())
}
fn transfer_expiry_delay(expires_at: &str) -> Result<Duration, ServerError> {
let expires_at_time = parse_transfer_time(expires_at)?;
let remaining = expires_at_time - OffsetDateTime::now_utc();
let millis = remaining.whole_milliseconds();
if millis <= 0 {
return Ok(Duration::ZERO);
}
Ok(Duration::from_millis(millis.min(u64::MAX as i128) as u64))
}
fn parse_transfer_time(value: &str) -> Result<OffsetDateTime, ServerError> {
OffsetDateTime::parse(value, &Rfc3339).map_err(|error| ServerError::InvalidTransferExpiry {
expires_at: value.to_string(),
details: error.to_string(),
})
}
fn transfer_subject(prefix: &str, session_key: &str, transfer_id: &str) -> String {
let session_prefix: String = session_key.chars().take(16).collect();
format!("{prefix}.{session_prefix}.{transfer_id}")
}
fn validate_transfer_id(transfer_id: &str) -> Result<(), ServerError> {
let invalid = transfer_id.is_empty()
|| transfer_id
.chars()
.any(|ch| matches!(ch, '.' | '*' | '>' | '/') || ch.is_whitespace() || ch.is_control());
if invalid {
return Err(ServerError::InvalidTransferId {
value: transfer_id.to_string(),
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use futures_util::future::BoxFuture;
use crate::service::RequestValidation;
use super::*;
#[derive(Debug, Clone)]
struct CountingValidator {
calls: Arc<AtomicUsize>,
allowed: bool,
}
impl RequestValidator for CountingValidator {
fn validate<'a>(
&'a self,
_subject: &'a str,
_payload: &'a Bytes,
_context: &'a RequestContext,
) -> BoxFuture<'a, Result<RequestValidation, ServerError>> {
Box::pin(async move {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(if self.allowed {
RequestValidation::allowed()
} else {
RequestValidation::denied()
})
})
}
}
#[tokio::test]
async fn transfer_validation_rejects_session_mismatch_before_validator() {
let calls = Arc::new(AtomicUsize::new(0));
let validator = CountingValidator {
calls: Arc::clone(&calls),
allowed: true,
};
let context = RequestContext {
subject: "transfer.v1.upload.session.transfer-1".to_string(),
session_key: Some("wrong-session".to_string()),
proof: Some("proof".to_string()),
iat: None,
request_id: None,
required_capabilities: None,
reply_to: None,
caller: None,
traceparent: None,
tracestate: None,
};
let error = validate_transfer_request(
"transfer.v1.upload.session.transfer-1",
&Bytes::new(),
&context,
"expected-session",
&validator,
)
.await
.expect_err("session mismatch");
assert!(matches!(
error,
ServerError::TransferSessionMismatch { actual_session_key, .. }
if actual_session_key == "wrong-session"
));
assert_eq!(calls.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn transfer_validation_requires_proof_before_session_mismatch() {
let calls = Arc::new(AtomicUsize::new(0));
let validator = CountingValidator {
calls: Arc::clone(&calls),
allowed: true,
};
let context = RequestContext {
subject: "transfer.v1.upload.session.transfer-1".to_string(),
session_key: Some("wrong-session".to_string()),
proof: None,
iat: None,
request_id: None,
required_capabilities: None,
reply_to: None,
caller: None,
traceparent: None,
tracestate: None,
};
let error = validate_transfer_request(
"transfer.v1.upload.session.transfer-1",
&Bytes::new(),
&context,
"expected-session",
&validator,
)
.await
.expect_err("missing proof");
assert!(matches!(error, ServerError::MissingProof { .. }));
assert_eq!(calls.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn transfer_validation_maps_denied_validator_to_request_denied() {
let calls = Arc::new(AtomicUsize::new(0));
let validator = CountingValidator {
calls: Arc::clone(&calls),
allowed: false,
};
let context = RequestContext {
subject: "transfer.v1.download.session.transfer-1".to_string(),
session_key: Some("expected-session".to_string()),
proof: Some("proof".to_string()),
iat: None,
request_id: None,
required_capabilities: None,
reply_to: None,
caller: None,
traceparent: None,
tracestate: None,
};
let error = validate_transfer_request(
"transfer.v1.download.session.transfer-1",
&Bytes::new(),
&context,
"expected-session",
&validator,
)
.await
.expect_err("denied");
assert!(matches!(
error,
ServerError::RequestDenied { session_key, .. } if session_key == "expected-session"
));
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
}