mod service;
pub use service::{ArchiveService, ArchiveServiceBuilder};
#[cfg(all(test, feature = "aws"))]
mod test_mock;
#[cfg(all(test, feature = "aws"))]
pub use test_mock::MockS3ArchiveManager;
#[cfg(feature = "aws")]
use crate::error::InklogError;
#[cfg(feature = "aws")]
use aws_config::meta::region::RegionProviderChain;
use chrono::{DateTime, Datelike, Utc};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use zeroize::{Zeroize, Zeroizing};
#[derive(Clone, Default)]
pub struct SecretString(Option<Zeroizing<String>>);
impl SecretString {
pub fn new(value: String) -> Self {
Self(Some(Zeroizing::new(value)))
}
pub fn as_str_safe(&self) -> Option<&str> {
self.as_deref()
}
#[allow(dead_code)]
pub(crate) fn take_internal(&mut self) -> Option<String> {
self.take()
}
#[cfg(feature = "debug")]
pub fn take_audited(&mut self, operation: &str) -> Option<String> {
tracing::warn!(
event = "sensitive_data_access",
operation = operation,
"Sensitive data accessed via SecretString::take_audited()"
);
self.take()
}
pub fn take(&mut self) -> Option<String> {
self.0.take().map(|s| s.as_str().to_string())
}
pub fn as_deref(&self) -> Option<&str> {
self.0.as_ref().map(|s| s.as_str())
}
pub fn is_some(&self) -> bool {
self.0.is_some()
}
pub fn is_none(&self) -> bool {
self.0.is_none()
}
}
impl From<String> for SecretString {
fn from(value: String) -> Self {
Self::new(value)
}
}
impl From<Option<String>> for SecretString {
fn from(value: Option<String>) -> Self {
match value {
Some(s) => Self::new(s),
None => Self(None),
}
}
}
impl Drop for SecretString {
fn drop(&mut self) {
if let Some(s) = &mut self.0 {
s.zeroize();
}
}
}
impl std::fmt::Debug for SecretString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
Some(_) => write!(f, "SecretString(Some(***))"),
None => write!(f, "SecretString(None)"),
}
}
}
impl Serialize for SecretString {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_none()
}
}
impl<'de> Deserialize<'de> for SecretString {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Option::<String>::deserialize(deserializer).map(|opt| opt.into())
}
}
#[derive(Clone, Deserialize)]
#[serde(default)]
pub struct S3ArchiveConfig {
pub enabled: bool,
pub bucket: String,
pub region: String,
pub archive_interval_days: u32,
pub schedule_expression: Option<String>,
pub local_retention_days: u32,
pub local_retention_path: PathBuf,
pub compression: CompressionType,
pub storage_class: StorageClass,
pub prefix: String,
pub access_key_id: SecretString,
pub secret_access_key: SecretString,
pub session_token: SecretString,
pub endpoint_url: Option<String>,
pub force_path_style: bool,
pub skip_bucket_validation: bool,
pub max_file_size_mb: u32,
pub encryption: Option<EncryptionConfig>,
#[serde(default = "default_archive_format")]
pub archive_format: String,
#[serde(default)]
pub parquet_config: crate::config::ParquetConfig,
}
fn default_archive_format() -> String {
"json".to_string()
}
impl Default for S3ArchiveConfig {
fn default() -> Self {
Self {
enabled: false,
bucket: "logs-archive".to_string(),
region: "us-east-1".to_string(),
archive_interval_days: 7,
schedule_expression: None,
local_retention_days: 30,
local_retention_path: PathBuf::from("logs/archive_failures"),
compression: CompressionType::Zstd,
storage_class: StorageClass::Standard,
prefix: "logs/".to_string(),
access_key_id: SecretString::default(),
secret_access_key: SecretString::default(),
session_token: SecretString::default(),
endpoint_url: None,
force_path_style: false,
skip_bucket_validation: false,
max_file_size_mb: 100,
encryption: None,
archive_format: "json".to_string(),
parquet_config: crate::config::ParquetConfig::default(),
}
}
}
impl std::fmt::Debug for S3ArchiveConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3ArchiveConfig")
.field("enabled", &self.enabled)
.field("bucket", &self.bucket)
.field("region", &self.region)
.field("archive_interval_days", &self.archive_interval_days)
.field("schedule_expression", &self.schedule_expression)
.field("local_retention_days", &self.local_retention_days)
.field("local_retention_path", &self.local_retention_path)
.field("compression", &self.compression)
.field("storage_class", &self.storage_class)
.field("prefix", &self.prefix)
.field("endpoint_url", &self.endpoint_url.as_ref().map(|_| "***"))
.field("force_path_style", &self.force_path_style)
.field("skip_bucket_validation", &self.skip_bucket_validation)
.field("max_file_size_mb", &self.max_file_size_mb)
.field("encryption", &self.encryption.as_ref().map(|_| "***"))
.field("archive_format", &self.archive_format)
.field("parquet_config", &self.parquet_config)
.finish_non_exhaustive()
}
}
impl Serialize for S3ArchiveConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut state = serializer.serialize_struct("S3ArchiveConfig", 21)?;
state.serialize_field("enabled", &self.enabled)?;
state.serialize_field("bucket", &self.bucket)?;
state.serialize_field("region", &self.region)?;
state.serialize_field("archive_interval_days", &self.archive_interval_days)?;
state.serialize_field("schedule_expression", &self.schedule_expression)?;
state.serialize_field("local_retention_days", &self.local_retention_days)?;
state.serialize_field("local_retention_path", &self.local_retention_path)?;
state.serialize_field("compression", &self.compression)?;
state.serialize_field("storage_class", &self.storage_class)?;
state.serialize_field("prefix", &self.prefix)?;
state.serialize_field("endpoint_url", &self.endpoint_url)?;
state.serialize_field("force_path_style", &self.force_path_style)?;
state.serialize_field("skip_bucket_validation", &self.skip_bucket_validation)?;
state.serialize_field("max_file_size_mb", &self.max_file_size_mb)?;
state.serialize_field("encryption", &self.encryption)?;
state.serialize_field("archive_format", &self.archive_format)?;
state.serialize_field("parquet_config", &self.parquet_config)?;
state.end()
}
}
impl S3ArchiveConfig {
pub fn validate_security(&self) -> Result<(), InklogError> {
if self.endpoint_url.is_some() {
tracing::warn!(
event = "sensitive_config_detected",
field = "endpoint_url",
"S3 archive configured with custom endpoint URL: custom endpoints may have different security characteristics"
);
}
if let Some(ref enc) = self.encryption {
if enc.customer_key.is_some() {
tracing::info!(
event = "customer_key_configured",
"S3 archive configured with customer-provided encryption key (SSE-C). Ensure key is stored securely."
);
}
}
let has_credentials = self.access_key_id.is_some() || self.secret_access_key.is_some();
if self.enabled && !has_credentials {
tracing::warn!(
event = "missing_credentials",
"S3 archive enabled but no AWS credentials configured. Ensure IAM role or external credential provider is available."
);
}
tracing::debug!(
event = "security_validation_complete",
has_endpoint_url = self.endpoint_url.is_some(),
has_customer_key = self
.encryption
.as_ref()
.map(|e| e.customer_key.is_some())
.unwrap_or(false),
has_credentials = has_credentials,
"Security validation completed for S3ArchiveConfig"
);
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CompressionType {
None,
Gzip,
Zstd,
Lz4,
Brotli,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StorageClass {
Standard,
IntelligentTiering,
StandardIa,
OnezoneIa,
Glacier,
GlacierDeepArchive,
ReducedRedundancy,
}
#[derive(Clone, Deserialize)]
pub struct EncryptionConfig {
pub algorithm: EncryptionAlgorithm,
pub kms_key_id: Option<String>,
pub customer_key: SecretString,
}
impl Serialize for EncryptionConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut state = serializer.serialize_struct("EncryptionConfig", 3)?;
state.serialize_field("algorithm", &self.algorithm)?;
state.serialize_field("kms_key_id", &self.kms_key_id)?;
state.end()
}
}
impl std::fmt::Debug for EncryptionConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EncryptionConfig")
.field("algorithm", &self.algorithm)
.field("kms_key_id", &self.kms_key_id)
.finish_non_exhaustive() }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EncryptionAlgorithm {
Aes256,
AwsKms,
CustomerKey,
}
#[derive(Debug, Clone, Default)]
pub struct ScheduleState {
pub last_scheduled_run: Option<DateTime<Utc>>,
pub last_successful_run: Option<DateTime<Utc>>,
pub last_run_status: Option<ArchiveStatus>,
pub consecutive_failures: u32,
pub locked_date: Option<chrono::NaiveDate>,
pub is_running: bool,
}
impl ScheduleState {
pub fn can_run_today(&self) -> bool {
let today = Utc::now().date_naive();
match self.locked_date {
Some(locked) if locked == today && self.is_running => false,
Some(locked) if locked == today => true, _ => true, }
}
pub fn start_execution(&mut self) {
let now = Utc::now();
self.last_scheduled_run = Some(now);
self.locked_date = Some(now.date_naive());
self.is_running = true;
}
pub fn mark_success(&mut self) {
let now = Utc::now();
self.last_successful_run = Some(now);
self.last_run_status = Some(ArchiveStatus::Success);
self.consecutive_failures = 0;
self.is_running = false;
}
pub fn mark_failed(&mut self) {
self.last_run_status = Some(ArchiveStatus::Failed);
self.consecutive_failures += 1;
self.is_running = false;
}
}
#[cfg(feature = "aws")]
pub struct S3ArchiveManager {
config: S3ArchiveConfig,
client: aws_sdk_s3::Client,
}
#[cfg(feature = "aws")]
impl S3ArchiveManager {
pub async fn new(config: S3ArchiveConfig) -> Result<Self, InklogError> {
let aws_config = Self::build_aws_config(&config).await?;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.force_path_style(config.force_path_style)
.build();
let client = aws_sdk_s3::Client::from_conf(s3_config);
if !config.skip_bucket_validation {
Self::validate_bucket(&client, &config.bucket).await?;
}
Ok(Self { config, client })
}
async fn build_aws_config(
config: &S3ArchiveConfig,
) -> Result<aws_config::SdkConfig, InklogError> {
let region_provider =
RegionProviderChain::first_try(aws_types::region::Region::new(config.region.clone()));
let mut aws_config = aws_config::from_env()
.region(region_provider)
.behavior_version(aws_config::BehaviorVersion::latest());
if let Some(endpoint_url) = &config.endpoint_url {
aws_config = aws_config.endpoint_url(endpoint_url);
}
if config.access_key_id.is_some() && config.secret_access_key.is_some() {
let credentials = aws_credential_types::Credentials::new(
config.access_key_id.as_deref().unwrap_or(""),
config.secret_access_key.as_deref().unwrap_or(""),
config.session_token.as_deref().map(|s| s.to_string()),
None,
"inklog-s3-archive",
);
aws_config = aws_config.credentials_provider(credentials);
}
let sdk_config = aws_config.load().await;
Ok(sdk_config)
}
async fn validate_bucket(client: &aws_sdk_s3::Client, bucket: &str) -> Result<(), InklogError> {
client
.head_bucket()
.bucket(bucket)
.send()
.await
.map_err(|e| InklogError::S3Error(format!("Bucket validation failed: {}", e)))?;
Ok(())
}
pub async fn archive_logs(
&self,
log_data: Vec<u8>,
start_date: DateTime<Utc>,
end_date: DateTime<Utc>,
mut metadata: ArchiveMetadata,
) -> Result<String, InklogError> {
let checksum = Self::calculate_checksum(&log_data);
let key = self.generate_s3_key(&start_date, &end_date, &metadata);
let compressed_data = self.compress_data(log_data).await?;
let data_len = compressed_data.len();
metadata.compressed_size = data_len as i64;
metadata.checksum = checksum;
metadata.start_date = Some(start_date);
metadata.end_date = Some(end_date);
metadata.compression_type = Some(self.config.compression.clone());
metadata.storage_class = Some(self.config.storage_class.clone());
if data_len > 5 * 1024 * 1024 {
self.upload_multipart(&key, compressed_data, &start_date, &end_date, &metadata)
.await
} else {
self.upload_single_put(&key, compressed_data, &start_date, &end_date, &metadata)
.await
}
}
fn calculate_checksum(data: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
async fn upload_single_put(
&self,
key: &str,
data: Vec<u8>,
start_date: &DateTime<Utc>,
end_date: &DateTime<Utc>,
metadata: &ArchiveMetadata,
) -> Result<String, InklogError> {
let mut put_request = self
.client
.put_object()
.bucket(&self.config.bucket)
.key(key)
.body(data.into());
let storage_class = self.get_aws_storage_class();
put_request = put_request.storage_class(storage_class);
if let Some(encryption) = &self.config.encryption {
match encryption.algorithm {
EncryptionAlgorithm::Aes256 => {
put_request = put_request
.server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::Aes256);
}
EncryptionAlgorithm::AwsKms => {
if let Some(kms_key_id) = &encryption.kms_key_id {
put_request = put_request
.server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
.ssekms_key_id(kms_key_id);
} else {
put_request = put_request.server_side_encryption(
aws_sdk_s3::types::ServerSideEncryption::AwsKms,
);
}
}
EncryptionAlgorithm::CustomerKey => {
return Err(InklogError::ConfigError(
"Customer-provided encryption keys not yet implemented".to_string(),
));
}
}
}
put_request = put_request
.metadata("start-date", start_date.to_rfc3339())
.metadata("end-date", end_date.to_rfc3339())
.metadata("record-count", metadata.record_count.to_string())
.metadata("original-size", metadata.original_size.to_string())
.metadata("compressed-size", metadata.compressed_size.to_string())
.metadata(
"compression",
format!("{:?}", self.config.compression).to_lowercase(),
)
.metadata(
"storage-class",
format!("{:?}", self.config.storage_class).to_lowercase(),
)
.metadata("checksum", metadata.checksum.clone())
.metadata("archive-version", metadata.archive_version.clone())
.metadata("archive-type", metadata.archive_type.clone())
.metadata("status", format!("{:?}", metadata.status).to_lowercase());
let _response = put_request
.send()
.await
.map_err(|e| InklogError::S3Error(format!("Upload failed: {}", e)))?;
Ok(key.to_string())
}
async fn upload_multipart(
&self,
key: &str,
data: Vec<u8>,
start_date: &DateTime<Utc>,
end_date: &DateTime<Utc>,
metadata: &ArchiveMetadata,
) -> Result<String, InklogError> {
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
let mut create_request = self
.client
.create_multipart_upload()
.bucket(&self.config.bucket)
.key(key);
create_request = create_request.storage_class(self.get_aws_storage_class());
if let Some(encryption) = &self.config.encryption {
match encryption.algorithm {
EncryptionAlgorithm::Aes256 => {
create_request = create_request
.server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::Aes256);
}
EncryptionAlgorithm::AwsKms => {
if let Some(kms_key_id) = &encryption.kms_key_id {
create_request = create_request
.server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
.ssekms_key_id(kms_key_id);
} else {
create_request = create_request.server_side_encryption(
aws_sdk_s3::types::ServerSideEncryption::AwsKms,
);
}
}
EncryptionAlgorithm::CustomerKey => {
return Err(InklogError::ConfigError(
"Customer-provided encryption keys not yet implemented".to_string(),
));
}
}
}
create_request = create_request
.metadata("start-date", start_date.to_rfc3339())
.metadata("end-date", end_date.to_rfc3339())
.metadata("record-count", metadata.record_count.to_string())
.metadata("original-size", metadata.original_size.to_string())
.metadata("compressed-size", metadata.compressed_size.to_string())
.metadata(
"compression",
format!("{:?}", self.config.compression).to_lowercase(),
)
.metadata(
"storage-class",
format!("{:?}", self.config.storage_class).to_lowercase(),
)
.metadata("checksum", metadata.checksum.clone())
.metadata("archive-version", metadata.archive_version.clone())
.metadata("archive-type", metadata.archive_type.clone())
.metadata("status", format!("{:?}", metadata.status).to_lowercase());
let multipart_upload = create_request
.send()
.await
.map_err(|e| InklogError::S3Error(format!("Multipart upload init failed: {}", e)))?;
let upload_id = multipart_upload
.upload_id()
.ok_or_else(|| InklogError::S3Error("No upload ID returned".to_string()))?;
let chunk_size = 5 * 1024 * 1024; let mut completed_parts = Vec::new();
for (i, chunk) in data.chunks(chunk_size).enumerate() {
let part_number = (i + 1) as i32;
let upload_part_response = self
.client
.upload_part()
.bucket(&self.config.bucket)
.key(key)
.upload_id(upload_id)
.part_number(part_number)
.body(chunk.to_vec().into())
.send()
.await
.map_err(|e| {
InklogError::S3Error(format!("Part {} upload failed: {}", part_number, e))
})?;
completed_parts.push(
CompletedPart::builder()
.e_tag(upload_part_response.e_tag().unwrap_or_default())
.part_number(part_number)
.build(),
);
}
let completed_upload = CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
self.client
.complete_multipart_upload()
.bucket(&self.config.bucket)
.key(key)
.upload_id(upload_id)
.multipart_upload(completed_upload)
.send()
.await
.map_err(|e| {
InklogError::S3Error(format!("Multipart upload completion failed: {}", e))
})?;
Ok(key.to_string())
}
fn get_aws_storage_class(&self) -> aws_sdk_s3::types::StorageClass {
match self.config.storage_class {
StorageClass::Standard => aws_sdk_s3::types::StorageClass::Standard,
StorageClass::IntelligentTiering => aws_sdk_s3::types::StorageClass::IntelligentTiering,
StorageClass::StandardIa => aws_sdk_s3::types::StorageClass::StandardIa,
StorageClass::OnezoneIa => aws_sdk_s3::types::StorageClass::OnezoneIa,
StorageClass::Glacier => aws_sdk_s3::types::StorageClass::Glacier,
StorageClass::GlacierDeepArchive => aws_sdk_s3::types::StorageClass::DeepArchive,
StorageClass::ReducedRedundancy => aws_sdk_s3::types::StorageClass::ReducedRedundancy,
}
}
fn generate_s3_key(
&self,
start_date: &DateTime<Utc>,
end_date: &DateTime<Utc>,
metadata: &ArchiveMetadata,
) -> String {
let base_prefix = self.config.prefix.trim_end_matches('/');
let date_prefix = format!(
"{}/{:04}/{:02}",
base_prefix,
start_date.year(),
start_date.month()
);
let filename = format!(
"logs_{}_{}_{}.parquet.{}",
start_date.format("%Y%m%d_%H%M%S"),
end_date.format("%Y%m%d_%H%M%S"),
metadata.record_count,
self.get_compression_extension()
);
format!("{}/{}", date_prefix, filename)
}
fn get_compression_extension(&self) -> &'static str {
match self.config.compression {
CompressionType::None => "parquet",
CompressionType::Gzip => "parquet.gz",
CompressionType::Zstd => "parquet.zst",
CompressionType::Lz4 => "parquet.lz4",
CompressionType::Brotli => "parquet.br",
}
}
async fn compress_data(&self, data: Vec<u8>) -> Result<Vec<u8>, InklogError> {
match self.config.compression {
CompressionType::None => Ok(data),
CompressionType::Gzip => {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&data).map_err(InklogError::IoError)?;
encoder.finish().map_err(InklogError::IoError)
}
CompressionType::Zstd => {
if data.len() > 1024 * 1024 {
let mut encoder = zstd::bulk::Compressor::new(3)
.map_err(|e| InklogError::CompressionError(e.to_string()))?;
encoder
.set_parameter(zstd::zstd_safe::CParameter::NbWorkers(
rayon::current_num_threads() as u32,
))
.map_err(|e| InklogError::CompressionError(e.to_string()))?;
let output = encoder
.compress(&data)
.map_err(|e| InklogError::CompressionError(e.to_string()))?;
Ok(output)
} else {
zstd::encode_all(&data[..], 3)
.map_err(|e| InklogError::CompressionError(e.to_string()))
}
}
CompressionType::Lz4 => {
use lz4::EncoderBuilder;
use std::io::Write;
let mut encoder = EncoderBuilder::new()
.level(4)
.build(Vec::new())
.map_err(|e| InklogError::CompressionError(e.to_string()))?;
encoder.write_all(&data).map_err(InklogError::IoError)?;
let (result, _) = encoder.finish();
Ok(result)
}
CompressionType::Brotli => {
use brotli::enc::BrotliEncoderParams;
use brotli::CompressorReader;
use std::io::Read;
let params = BrotliEncoderParams {
quality: 6,
magic_number: true,
..Default::default()
};
let mut input = std::io::Cursor::new(data);
let mut output = Vec::new();
let mut compressor =
CompressorReader::new(&mut input, 4096, params.quality as u32, 22);
compressor
.read_to_end(&mut output)
.map_err(InklogError::IoError)?;
Ok(output)
}
}
}
pub async fn list_archives(
&self,
start_date: Option<DateTime<Utc>>,
end_date: Option<DateTime<Utc>>,
prefix: Option<String>,
) -> Result<Vec<ArchiveInfo>, InklogError> {
let mut list_request = self.client.list_objects_v2().bucket(&self.config.bucket);
let effective_prefix = if let Some(user_prefix) = prefix {
format!(
"{}/{}",
self.config.prefix.trim_end_matches('/'),
user_prefix
)
} else {
self.config.prefix.clone()
};
list_request = list_request.prefix(effective_prefix);
let response = list_request
.send()
.await
.map_err(|e| InklogError::S3Error(format!("List objects failed: {}", e)))?;
let mut archives = Vec::new();
let objects = response.contents();
for object in objects {
if let (Some(key), Some(last_modified), Some(size)) =
(object.key(), object.last_modified(), object.size())
{
let archive_date = DateTime::<Utc>::from_timestamp(
last_modified.secs(),
last_modified.subsec_nanos(),
)
.unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).unwrap_or_default());
let in_date_range = match (start_date, end_date) {
(Some(start), Some(end)) => archive_date >= start && archive_date <= end,
(Some(start), None) => archive_date >= start,
(None, Some(end)) => archive_date <= end,
(None, None) => true,
};
if in_date_range {
archives.push(ArchiveInfo {
key: key.to_string(),
size,
last_modified: archive_date,
storage_class: object.storage_class().map(|s| s.to_string()),
});
}
}
}
Ok(archives)
}
pub async fn delete_archive(&self, key: &str) -> Result<(), InklogError> {
self.client
.delete_object()
.bucket(&self.config.bucket)
.key(key)
.send()
.await
.map_err(|e| InklogError::S3Error(format!("Delete failed: {}", e)))?;
Ok(())
}
pub async fn restore_archive(&self, key: &str) -> Result<Vec<u8>, InklogError> {
let head_response = self
.client
.head_object()
.bucket(&self.config.bucket)
.key(key)
.send()
.await
.map_err(|e| InklogError::S3Error(format!("Head object failed: {}", e)))?;
if let Some(storage_class) = head_response.storage_class() {
if matches!(
storage_class,
aws_sdk_s3::types::StorageClass::Glacier
| aws_sdk_s3::types::StorageClass::DeepArchive
) {
self.client
.restore_object()
.bucket(&self.config.bucket)
.key(key)
.restore_request(
aws_sdk_s3::types::RestoreRequest::builder()
.days(1) .tier(aws_sdk_s3::types::Tier::Standard)
.build(),
)
.send()
.await
.map_err(|e| InklogError::S3Error(format!("Restore request failed: {}", e)))?;
return Err(InklogError::S3Error(
"Archive restoration initiated. Object will be available in 3-5 hours for Glacier, 12 hours for Deep Archive".to_string()
));
}
}
let response = self
.client
.get_object()
.bucket(&self.config.bucket)
.key(key)
.send()
.await
.map_err(|e| InklogError::S3Error(format!("Get object failed: {}", e)))?;
let data = response
.body
.collect()
.await
.map_err(|e| InklogError::S3Error(format!("Read object body failed: {}", e)))?
.into_bytes();
self.decompress_data(data.to_vec()).await
}
async fn decompress_data(&self, data: Vec<u8>) -> Result<Vec<u8>, InklogError> {
match self.config.compression {
CompressionType::None => Ok(data),
CompressionType::Gzip => {
use flate2::read::GzDecoder;
use std::io::Read;
let mut decoder = GzDecoder::new(&data[..]);
let mut result = Vec::new();
decoder
.read_to_end(&mut result)
.map_err(InklogError::IoError)?;
Ok(result)
}
CompressionType::Zstd => zstd::decode_all(&data[..])
.map_err(|e| InklogError::CompressionError(e.to_string())),
CompressionType::Lz4 => {
use lz4::Decoder;
use std::io::Read;
let mut decoder = Decoder::new(&data[..])
.map_err(|e| InklogError::CompressionError(e.to_string()))?;
let mut result = Vec::new();
decoder
.read_to_end(&mut result)
.map_err(InklogError::IoError)?;
Ok(result)
}
CompressionType::Brotli => {
use brotli::Decompressor;
use std::io::Read;
let mut decoder = Decompressor::new(&data[..], data.len());
let mut result = Vec::new();
decoder
.read_to_end(&mut result)
.map_err(InklogError::IoError)?;
Ok(result)
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub enum ArchiveStatus {
#[default]
InProgress,
Success,
FailedLocal,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchiveMetadata {
pub record_count: i64,
pub original_size: i64,
pub compressed_size: i64,
#[serde(default)]
pub compression_ratio: f64,
pub archive_type: String,
#[serde(skip)]
pub start_date: Option<DateTime<Utc>>,
#[serde(skip)]
pub end_date: Option<DateTime<Utc>>,
#[serde(skip)]
pub compression_type: Option<CompressionType>,
#[serde(skip)]
pub storage_class: Option<StorageClass>,
pub checksum: String,
#[serde(default = "default_archive_version")]
pub archive_version: String,
#[serde(default)]
pub parquet_version: Option<String>,
#[serde(default)]
pub row_group_count: i32,
pub tags: Vec<String>,
pub s3_key: String,
#[serde(default)]
pub status: ArchiveStatus,
}
fn default_archive_version() -> String {
"1.0".to_string()
}
impl ArchiveMetadata {
pub fn new(record_count: i64, original_size: i64, archive_type: &str) -> Self {
Self {
record_count,
original_size,
compressed_size: 0,
compression_ratio: 0.0,
archive_type: archive_type.to_string(),
start_date: None,
end_date: None,
compression_type: None,
storage_class: None,
checksum: String::new(),
archive_version: default_archive_version(),
parquet_version: None,
row_group_count: 0,
tags: vec![],
s3_key: String::new(),
status: ArchiveStatus::InProgress,
}
}
pub fn with_tag(mut self, tag: &str) -> Self {
self.tags.push(tag.to_string());
self
}
pub fn with_checksum(mut self, checksum: String) -> Self {
self.checksum = checksum;
self
}
pub fn with_s3_key(mut self, s3_key: String) -> Self {
self.s3_key = s3_key;
self
}
pub fn mark_success(mut self) -> Self {
if self.compressed_size > 0 {
self.compression_ratio = self.original_size as f64 / self.compressed_size as f64;
} else {
self.compression_ratio = 1.0;
}
self.status = ArchiveStatus::Success;
self
}
pub fn mark_failed_local(mut self) -> Self {
self.status = ArchiveStatus::FailedLocal;
self
}
pub fn mark_failed(mut self) -> Self {
self.status = ArchiveStatus::Failed;
self
}
}
#[derive(Debug, Clone)]
pub struct ArchiveInfo {
pub key: String,
pub size: i64,
pub last_modified: DateTime<Utc>,
pub storage_class: Option<String>,
}