use std::time::Duration;
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::core::bulk_export::ExportJobId;
use crate::error::StorageResult;
use crate::tenant::TenantContext;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ExportPartKey {
pub tenant_id: String,
pub job_id: ExportJobId,
pub resource_type: String,
pub file_type: String,
pub part_index: u32,
pub fencing_token: u64,
}
impl ExportPartKey {
pub fn output(
tenant_id: impl Into<String>,
job_id: ExportJobId,
resource_type: impl Into<String>,
part_index: u32,
fencing_token: u64,
) -> Self {
Self {
tenant_id: tenant_id.into(),
job_id,
resource_type: resource_type.into(),
file_type: "output".to_string(),
part_index,
fencing_token,
}
}
pub fn error(
tenant_id: impl Into<String>,
job_id: ExportJobId,
resource_type: impl Into<String>,
part_index: u32,
fencing_token: u64,
) -> Self {
Self {
tenant_id: tenant_id.into(),
job_id,
resource_type: resource_type.into(),
file_type: "error".to_string(),
part_index,
fencing_token,
}
}
pub fn part_segment(&self) -> String {
format!("{}-{}", self.resource_type, self.part_index)
}
}
pub struct ExportPartWriter {
pub writer: std::pin::Pin<Box<dyn AsyncWrite + Send>>,
pub line_count: u64,
pub byte_count: u64,
}
impl ExportPartWriter {
pub fn new(writer: std::pin::Pin<Box<dyn AsyncWrite + Send>>) -> Self {
Self {
writer,
line_count: 0,
byte_count: 0,
}
}
pub async fn write_line(&mut self, line: &str) -> std::io::Result<()> {
use tokio::io::AsyncWriteExt;
self.writer.write_all(line.as_bytes()).await?;
self.writer.write_all(b"\n").await?;
self.line_count += 1;
self.byte_count += line.len() as u64 + 1;
Ok(())
}
pub async fn flush(&mut self) -> std::io::Result<()> {
use tokio::io::AsyncWriteExt;
self.writer.flush().await
}
}
impl std::fmt::Debug for ExportPartWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExportPartWriter")
.field("line_count", &self.line_count)
.field("byte_count", &self.byte_count)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct FinalizedPart {
pub key: ExportPartKey,
pub resource_type: String,
pub line_count: u64,
pub size_bytes: u64,
}
#[derive(Debug, Clone)]
pub struct DownloadUrl {
pub url: String,
pub requires_access_token: bool,
}
#[async_trait]
pub trait ExportOutputStore: Send + Sync {
async fn open_writer(&self, key: &ExportPartKey) -> StorageResult<ExportPartWriter>;
async fn finalize_part(
&self,
key: &ExportPartKey,
writer: ExportPartWriter,
) -> StorageResult<FinalizedPart>;
async fn download_url(&self, key: &ExportPartKey, ttl: Duration) -> StorageResult<DownloadUrl>;
async fn open_reader(
&self,
key: &ExportPartKey,
) -> StorageResult<std::pin::Pin<Box<dyn AsyncRead + Send>>>;
async fn delete_job_outputs(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
) -> StorageResult<()>;
}