use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};
use tensor_blob::BlobStore;
use crate::audit::{AuditEvent, AuditLogger};
use crate::auth;
use crate::config::{AuthConfig, ServerConfig};
use crate::convert::{blob_metadata_to_proto, upload_metadata_to_put_options};
use crate::metrics::ServerMetrics;
use crate::proto::{
blob_service_server::BlobService, ArtifactInfo, BlobDeleteRequest, BlobDeleteResponse,
BlobDownloadChunk, BlobDownloadRequest, BlobMetadataRequest, BlobUploadRequest,
BlobUploadResponse,
};
use crate::rate_limit::{Operation, RateLimiter};
pub struct BlobServiceImpl {
blob_store: Arc<Mutex<BlobStore>>,
chunk_size: usize,
auth_config: Option<AuthConfig>,
max_upload_size: usize,
stream_channel_capacity: usize,
rate_limiter: Option<Arc<RateLimiter>>,
audit_logger: Option<Arc<AuditLogger>>,
metrics: Option<Arc<ServerMetrics>>,
}
const DEFAULT_MAX_UPLOAD_SIZE: usize = 512 * 1024 * 1024;
const DEFAULT_STREAM_CHANNEL_CAPACITY: usize = 32;
impl BlobServiceImpl {
#[must_use]
pub const fn new(blob_store: Arc<Mutex<BlobStore>>) -> Self {
Self {
blob_store,
chunk_size: 64 * 1024, auth_config: None,
max_upload_size: DEFAULT_MAX_UPLOAD_SIZE,
stream_channel_capacity: DEFAULT_STREAM_CHANNEL_CAPACITY,
rate_limiter: None,
audit_logger: None,
metrics: None,
}
}
#[must_use]
pub fn with_config(blob_store: Arc<Mutex<BlobStore>>, config: &ServerConfig) -> Self {
Self {
blob_store,
chunk_size: config.blob_chunk_size,
auth_config: config.auth.clone(),
max_upload_size: config.max_upload_size,
stream_channel_capacity: config.stream_channel_capacity,
rate_limiter: None,
audit_logger: None,
metrics: None,
}
}
#[must_use]
pub const fn with_auth(blob_store: Arc<Mutex<BlobStore>>, auth_config: AuthConfig) -> Self {
Self {
blob_store,
chunk_size: 64 * 1024,
auth_config: Some(auth_config),
max_upload_size: DEFAULT_MAX_UPLOAD_SIZE,
stream_channel_capacity: DEFAULT_STREAM_CHANNEL_CAPACITY,
rate_limiter: None,
audit_logger: None,
metrics: None,
}
}
#[must_use]
pub fn with_full_config(
blob_store: Arc<Mutex<BlobStore>>,
config: &ServerConfig,
rate_limiter: Option<Arc<RateLimiter>>,
audit_logger: Option<Arc<AuditLogger>>,
metrics: Option<Arc<ServerMetrics>>,
) -> Self {
Self {
blob_store,
chunk_size: config.blob_chunk_size,
auth_config: config.auth.clone(),
max_upload_size: config.max_upload_size,
stream_channel_capacity: config.stream_channel_capacity,
rate_limiter,
audit_logger,
metrics,
}
}
#[must_use]
pub const fn with_max_upload_size(mut self, size: usize) -> Self {
self.max_upload_size = size;
self
}
}
#[tonic::async_trait]
impl BlobService for BlobServiceImpl {
#[allow(clippy::too_many_lines)]
async fn upload(
&self,
request: Request<Streaming<BlobUploadRequest>>,
) -> Result<Response<BlobUploadResponse>, Status> {
let start = Instant::now();
let identity = match auth::validate_request_with_audit(
&request,
&self.auth_config,
self.rate_limiter.as_deref(),
self.audit_logger.as_deref(),
) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "upload", false, latency_ms);
}
return Err(status);
},
};
if let Some(ref limiter) = self.rate_limiter {
if let Some(ref id) = identity {
if let Err(msg) = limiter.check_and_record(id, Operation::BlobOp) {
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::RateLimited {
identity: id.clone(),
operation: "blob_op".to_string(),
},
None,
);
}
if let Some(ref m) = self.metrics {
m.record_rate_limited(id, "blob_upload");
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "upload", false, latency_ms);
}
return Err(Status::resource_exhausted(msg));
}
}
}
let mut stream = request.into_inner();
let first_msg = stream
.next()
.await
.ok_or_else(|| Status::invalid_argument("empty upload stream"))?
.map_err(|e| Status::internal(format!("stream error: {e}")))?;
let metadata = match first_msg.request {
Some(crate::proto::blob_upload_request::Request::Metadata(m)) => m,
Some(crate::proto::blob_upload_request::Request::Chunk(_)) => {
return Err(Status::invalid_argument(
"first message must be metadata, not chunk",
));
},
None => {
return Err(Status::invalid_argument("empty request"));
},
};
let filename = metadata.filename.clone();
let options = upload_metadata_to_put_options(&metadata);
let mut data = Vec::new();
let max_size = self.max_upload_size;
while let Some(msg) = stream.next().await {
let msg = msg.map_err(|e| Status::internal(format!("stream error: {e}")))?;
match msg.request {
Some(crate::proto::blob_upload_request::Request::Chunk(chunk)) => {
if data.len().saturating_add(chunk.len()) > max_size {
return Err(Status::resource_exhausted(format!(
"upload exceeds maximum size of {max_size} bytes"
)));
}
data.extend_from_slice(&chunk);
},
Some(crate::proto::blob_upload_request::Request::Metadata(_)) => {
return Err(Status::invalid_argument(
"metadata can only appear as first message",
));
},
None => {
},
}
}
if data.is_empty() {
return Err(Status::invalid_argument("no data provided"));
}
let (artifact_id, meta) = {
let store = self.blob_store.lock().await;
let artifact_id = store.put(&filename, &data, options).await.map_err(|e| {
tracing::error!("Blob store error: {e}");
Status::internal("internal storage error")
})?;
let meta = store.metadata(&artifact_id).await.map_err(|e| {
tracing::error!("Blob metadata error: {e}");
Status::internal("internal storage error")
})?;
drop(store);
(artifact_id, meta)
};
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::BlobUpload {
identity: identity.clone(),
artifact_id: artifact_id.clone(),
size: meta.size,
},
None,
);
}
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "upload", true, latency_ms);
}
Ok(Response::new(BlobUploadResponse {
artifact_id,
size: meta.size as u64,
checksum: meta.checksum,
}))
}
type DownloadStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<BlobDownloadChunk, Status>> + Send>>;
async fn download(
&self,
request: Request<BlobDownloadRequest>,
) -> Result<Response<Self::DownloadStream>, Status> {
let start = Instant::now();
let identity = match auth::validate_request_with_audit(
&request,
&self.auth_config,
self.rate_limiter.as_deref(),
self.audit_logger.as_deref(),
) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "download", false, latency_ms);
}
return Err(status);
},
};
if let Some(ref limiter) = self.rate_limiter {
if let Some(ref id) = identity {
if let Err(msg) = limiter.check_and_record(id, Operation::BlobOp) {
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::RateLimited {
identity: id.clone(),
operation: "blob_op".to_string(),
},
None,
);
}
if let Some(ref m) = self.metrics {
m.record_rate_limited(id, "blob_download");
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "download", false, latency_ms);
}
return Err(Status::resource_exhausted(msg));
}
}
}
let artifact_id = request.into_inner().artifact_id;
let chunk_size = self.chunk_size;
let store = self.blob_store.lock().await;
let data = match store.get(&artifact_id).await {
Ok(d) => d,
Err(e) => {
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "download", false, latency_ms);
}
if matches!(e, tensor_blob::BlobError::NotFound(_)) {
return Err(Status::not_found(format!(
"artifact not found: {artifact_id}"
)));
}
tracing::error!("Blob download error: {e}");
return Err(Status::internal("internal storage error"));
},
};
drop(store);
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::BlobDownload {
identity: identity.clone(),
artifact_id: artifact_id.clone(),
},
None,
);
}
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "download", true, latency_ms);
}
let (tx, rx) = mpsc::channel(self.stream_channel_capacity);
tokio::spawn(async move {
let chunks: Vec<_> = data.chunks(chunk_size).collect();
let total_chunks = chunks.len();
for (i, chunk_data) in chunks.into_iter().enumerate() {
let is_final = i == total_chunks - 1;
let chunk = BlobDownloadChunk {
data: chunk_data.to_vec(),
is_final,
};
if tx.send(Ok(chunk)).await.is_err() {
return;
}
}
});
let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream)))
}
async fn delete(
&self,
request: Request<BlobDeleteRequest>,
) -> Result<Response<BlobDeleteResponse>, Status> {
let start = Instant::now();
let identity = match auth::validate_request_with_audit(
&request,
&self.auth_config,
self.rate_limiter.as_deref(),
self.audit_logger.as_deref(),
) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "delete", false, latency_ms);
}
return Err(status);
},
};
if let Some(ref limiter) = self.rate_limiter {
if let Some(ref id) = identity {
if let Err(msg) = limiter.check_and_record(id, Operation::BlobOp) {
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::RateLimited {
identity: id.clone(),
operation: "blob_op".to_string(),
},
None,
);
}
if let Some(ref m) = self.metrics {
m.record_rate_limited(id, "blob_delete");
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "delete", false, latency_ms);
}
return Err(Status::resource_exhausted(msg));
}
}
}
let artifact_id = request.into_inner().artifact_id;
{
let store = self.blob_store.lock().await;
if let Err(e) = store.delete(&artifact_id).await {
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "delete", false, latency_ms);
}
if matches!(e, tensor_blob::BlobError::NotFound(_)) {
return Err(Status::not_found(format!(
"artifact not found: {artifact_id}"
)));
}
tracing::error!("Blob delete error: {e}");
return Err(Status::internal("internal storage error"));
}
}
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::BlobDelete {
identity: identity.clone(),
artifact_id: artifact_id.clone(),
},
None,
);
}
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "delete", true, latency_ms);
}
Ok(Response::new(BlobDeleteResponse { success: true }))
}
async fn get_metadata(
&self,
request: Request<BlobMetadataRequest>,
) -> Result<Response<ArtifactInfo>, Status> {
let start = Instant::now();
let _identity = match auth::validate_request(&request, &self.auth_config) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "get_metadata", false, latency_ms);
}
return Err(status);
},
};
let artifact_id = request.into_inner().artifact_id;
let metadata = {
let store = self.blob_store.lock().await;
match store.metadata(&artifact_id).await {
Ok(m) => m,
Err(e) => {
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "get_metadata", false, latency_ms);
}
if matches!(e, tensor_blob::BlobError::NotFound(_)) {
return Err(Status::not_found(format!(
"artifact not found: {artifact_id}"
)));
}
tracing::error!("Blob metadata error: {e}");
return Err(Status::internal("internal storage error"));
},
}
};
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("blob", "get_metadata", true, latency_ms);
}
Ok(Response::new(blob_metadata_to_proto(&metadata)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tensor_blob::BlobConfig;
use tensor_store::TensorStore;
use tokio_stream::StreamExt;
async fn create_test_blob_store() -> Arc<Mutex<BlobStore>> {
let store = TensorStore::new();
let blob_store = BlobStore::new(store, BlobConfig::default()).await.unwrap();
Arc::new(Mutex::new(blob_store))
}
#[tokio::test]
async fn test_download_after_direct_upload() {
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put(
"test.txt",
b"Hello, World!",
tensor_blob::PutOptions::default(),
)
.await
.unwrap()
};
let service = BlobServiceImpl::new(blob_store);
let download_request = Request::new(BlobDownloadRequest {
artifact_id: artifact_id.clone(),
});
let mut download_stream = service
.download(download_request)
.await
.unwrap()
.into_inner();
let mut downloaded_data = Vec::new();
while let Some(chunk) = download_stream.next().await {
let chunk = chunk.unwrap();
downloaded_data.extend_from_slice(&chunk.data);
if chunk.is_final {
break;
}
}
assert_eq!(downloaded_data, b"Hello, World!");
}
#[tokio::test]
async fn test_download_not_found() {
let blob_store = create_test_blob_store().await;
let service = BlobServiceImpl::new(blob_store);
let request = Request::new(BlobDownloadRequest {
artifact_id: "nonexistent".to_string(),
});
let result = service.download(request).await;
let Err(err) = result else {
panic!("expected error");
};
assert_eq!(err.code(), tonic::Code::NotFound);
}
#[tokio::test]
async fn test_delete() {
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put("test.txt", b"data", tensor_blob::PutOptions::default())
.await
.unwrap()
};
let service = BlobServiceImpl::new(Arc::clone(&blob_store));
let request = Request::new(BlobDeleteRequest {
artifact_id: artifact_id.clone(),
});
let response = service.delete(request).await.unwrap();
assert!(response.into_inner().success);
let store = blob_store.lock().await;
assert!(!store.exists(&artifact_id).await.unwrap());
}
#[tokio::test]
async fn test_delete_not_found() {
let blob_store = create_test_blob_store().await;
let service = BlobServiceImpl::new(blob_store);
let request = Request::new(BlobDeleteRequest {
artifact_id: "nonexistent".to_string(),
});
let result = service.delete(request).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), tonic::Code::NotFound);
}
#[tokio::test]
async fn test_get_metadata() {
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put(
"test.txt",
b"Hello",
tensor_blob::PutOptions::new()
.with_content_type("text/plain")
.with_tag("test"),
)
.await
.unwrap()
};
let service = BlobServiceImpl::new(blob_store);
let request = Request::new(BlobMetadataRequest { artifact_id });
let response = service.get_metadata(request).await.unwrap();
let info = response.into_inner();
assert_eq!(info.filename, "test.txt");
assert_eq!(info.content_type, "text/plain");
assert_eq!(info.size, 5);
assert!(info.tags.contains(&"test".to_string()));
}
#[tokio::test]
async fn test_get_metadata_not_found() {
let blob_store = create_test_blob_store().await;
let service = BlobServiceImpl::new(blob_store);
let request = Request::new(BlobMetadataRequest {
artifact_id: "nonexistent".to_string(),
});
let result = service.get_metadata(request).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), tonic::Code::NotFound);
}
#[tokio::test]
async fn test_service_with_config() {
let blob_store = create_test_blob_store().await;
let config = ServerConfig::new().with_blob_chunk_size(32 * 1024);
let service = BlobServiceImpl::with_config(blob_store, &config);
assert_eq!(service.chunk_size, 32 * 1024);
}
#[tokio::test]
async fn test_upload_records_metrics() {
use crate::metrics::ServerMetrics;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
let blob_store = create_test_blob_store().await;
let config = ServerConfig::new();
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let artifact_id = {
let store = blob_store.lock().await;
store
.put("test.txt", b"data", tensor_blob::PutOptions::default())
.await
.unwrap()
};
let service =
BlobServiceImpl::with_full_config(blob_store, &config, None, None, Some(metrics));
let request = Request::new(BlobDownloadRequest { artifact_id });
let result = service.download(request).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_download_records_metrics() {
use crate::metrics::ServerMetrics;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
let blob_store = create_test_blob_store().await;
let config = ServerConfig::new();
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let artifact_id = {
let store = blob_store.lock().await;
store
.put(
"test.txt",
b"download_test",
tensor_blob::PutOptions::default(),
)
.await
.unwrap()
};
let service =
BlobServiceImpl::with_full_config(blob_store, &config, None, None, Some(metrics));
let request = Request::new(BlobDownloadRequest { artifact_id });
let result = service.download(request).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_delete_records_metrics() {
use crate::metrics::ServerMetrics;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
let blob_store = create_test_blob_store().await;
let config = ServerConfig::new();
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let artifact_id = {
let store = blob_store.lock().await;
store
.put(
"test.txt",
b"delete_test",
tensor_blob::PutOptions::default(),
)
.await
.unwrap()
};
let service = BlobServiceImpl::with_full_config(
Arc::clone(&blob_store),
&config,
None,
None,
Some(metrics),
);
let request = Request::new(BlobDeleteRequest { artifact_id });
let result = service.delete(request).await;
assert!(result.is_ok());
}
#[test]
fn test_with_full_config_all_parameters() {
use crate::audit::{AuditConfig, AuditLogger};
use crate::metrics::ServerMetrics;
use crate::rate_limit::{RateLimitConfig, RateLimiter};
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
let store = TensorStore::new();
let blob_store = Arc::new(Mutex::new(
futures::executor::block_on(BlobStore::new(store, BlobConfig::default())).unwrap(),
));
let config = ServerConfig::new()
.with_blob_chunk_size(128 * 1024)
.with_max_upload_size(256 * 1024 * 1024);
let rate_limiter = Arc::new(RateLimiter::new(RateLimitConfig::default()));
let audit_logger = Arc::new(AuditLogger::new(AuditConfig::default()));
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let service = BlobServiceImpl::with_full_config(
blob_store,
&config,
Some(Arc::clone(&rate_limiter)),
Some(Arc::clone(&audit_logger)),
Some(Arc::clone(&metrics)),
);
assert_eq!(service.chunk_size, 128 * 1024);
assert_eq!(service.max_upload_size, 256 * 1024 * 1024);
assert!(service.rate_limiter.is_some());
assert!(service.audit_logger.is_some());
assert!(service.metrics.is_some());
}
#[test]
fn test_with_full_config_minimal_parameters() {
let store = TensorStore::new();
let blob_store = Arc::new(Mutex::new(
futures::executor::block_on(BlobStore::new(store, BlobConfig::default())).unwrap(),
));
let config = ServerConfig::new();
let service = BlobServiceImpl::with_full_config(blob_store, &config, None, None, None);
assert!(service.rate_limiter.is_none());
assert!(service.audit_logger.is_none());
assert!(service.metrics.is_none());
}
#[test]
fn test_constructor_new() {
let store = TensorStore::new();
let blob_store = Arc::new(Mutex::new(
futures::executor::block_on(BlobStore::new(store, BlobConfig::default())).unwrap(),
));
let service = BlobServiceImpl::new(blob_store);
assert_eq!(service.chunk_size, 64 * 1024);
assert_eq!(service.max_upload_size, DEFAULT_MAX_UPLOAD_SIZE);
assert!(service.auth_config.is_none());
assert!(service.rate_limiter.is_none());
}
#[test]
fn test_constructor_with_auth() {
use crate::config::{ApiKey, AuthConfig};
let store = TensorStore::new();
let blob_store = Arc::new(Mutex::new(
futures::executor::block_on(BlobStore::new(store, BlobConfig::default())).unwrap(),
));
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"test-key-1234567890".to_string(),
"user:test".to_string(),
));
let service = BlobServiceImpl::with_auth(blob_store, auth_config);
assert!(service.auth_config.is_some());
}
#[tokio::test]
async fn test_download_auth_required() {
use crate::config::{ApiKey, AuthConfig};
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put("test.txt", b"data", tensor_blob::PutOptions::default())
.await
.unwrap()
};
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"required-key-123456".to_string(),
"user:required".to_string(),
))
.with_anonymous(false);
let service = BlobServiceImpl::with_auth(Arc::clone(&blob_store), auth_config);
let request = Request::new(BlobDownloadRequest { artifact_id });
let result = service.download(request).await;
assert!(result.is_err());
if let Err(status) = result {
assert_eq!(status.code(), tonic::Code::Unauthenticated);
}
}
#[tokio::test]
async fn test_download_invalid_api_key() {
use crate::config::{ApiKey, AuthConfig};
use tonic::metadata::MetadataValue;
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put("test.txt", b"data", tensor_blob::PutOptions::default())
.await
.unwrap()
};
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"valid-key-123456789".to_string(),
"user:valid".to_string(),
))
.with_anonymous(false);
let service = BlobServiceImpl::with_auth(Arc::clone(&blob_store), auth_config);
let mut request = Request::new(BlobDownloadRequest { artifact_id });
request
.metadata_mut()
.insert("x-api-key", MetadataValue::from_static("wrong-key-000000"));
let result = service.download(request).await;
assert!(result.is_err());
if let Err(status) = result {
assert_eq!(status.code(), tonic::Code::Unauthenticated);
}
}
#[tokio::test]
async fn test_delete_auth_required() {
use crate::config::{ApiKey, AuthConfig};
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put("test.txt", b"data", tensor_blob::PutOptions::default())
.await
.unwrap()
};
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"delete-key-1234567".to_string(),
"user:delete".to_string(),
))
.with_anonymous(false);
let service = BlobServiceImpl::with_auth(Arc::clone(&blob_store), auth_config);
let request = Request::new(BlobDeleteRequest { artifact_id });
let result = service.delete(request).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), tonic::Code::Unauthenticated);
}
#[tokio::test]
async fn test_metadata_auth_required() {
use crate::config::{ApiKey, AuthConfig};
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put("test.txt", b"data", tensor_blob::PutOptions::default())
.await
.unwrap()
};
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"meta-key-12345678".to_string(),
"user:meta".to_string(),
))
.with_anonymous(false);
let service = BlobServiceImpl::with_auth(Arc::clone(&blob_store), auth_config);
let request = Request::new(BlobMetadataRequest { artifact_id });
let result = service.get_metadata(request).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), tonic::Code::Unauthenticated);
}
#[tokio::test]
async fn test_download_rate_limited() {
use crate::config::{ApiKey, AuthConfig};
use crate::rate_limit::{RateLimitConfig, RateLimiter};
use tonic::metadata::MetadataValue;
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put("test.txt", b"data", tensor_blob::PutOptions::default())
.await
.unwrap()
};
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"rate-key-123456789".to_string(),
"user:rate".to_string(),
));
let rate_limiter = Arc::new(RateLimiter::new(
RateLimitConfig::new().with_max_blob_ops(1),
));
let config = ServerConfig::new().with_auth(auth_config);
let service = BlobServiceImpl::with_full_config(
Arc::clone(&blob_store),
&config,
Some(rate_limiter),
None,
None,
);
let mut request = Request::new(BlobDownloadRequest {
artifact_id: artifact_id.clone(),
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("rate-key-123456789"),
);
let result = service.download(request).await;
assert!(result.is_ok());
let mut request = Request::new(BlobDownloadRequest { artifact_id });
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("rate-key-123456789"),
);
let result = service.download(request).await;
assert!(result.is_err());
if let Err(status) = result {
assert_eq!(status.code(), tonic::Code::ResourceExhausted);
}
}
#[tokio::test]
async fn test_download_with_audit_logging() {
use crate::audit::{AuditConfig, AuditLogger};
use crate::config::{ApiKey, AuthConfig};
use tonic::metadata::MetadataValue;
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put(
"test.txt",
b"audit data",
tensor_blob::PutOptions::default(),
)
.await
.unwrap()
};
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"audit-key-12345678".to_string(),
"user:audit".to_string(),
));
let audit_logger = Arc::new(AuditLogger::new(AuditConfig::default()));
let config = ServerConfig::new().with_auth(auth_config);
let service = BlobServiceImpl::with_full_config(
Arc::clone(&blob_store),
&config,
None,
Some(audit_logger),
None,
);
let mut request = Request::new(BlobDownloadRequest { artifact_id });
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("audit-key-12345678"),
);
let result = service.download(request).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_delete_with_audit_logging() {
use crate::audit::{AuditConfig, AuditLogger};
use crate::config::{ApiKey, AuthConfig};
use tonic::metadata::MetadataValue;
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put(
"audit_delete.txt",
b"delete me",
tensor_blob::PutOptions::default(),
)
.await
.unwrap()
};
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"audit-del-key-1234".to_string(),
"user:audit_del".to_string(),
));
let audit_logger = Arc::new(AuditLogger::new(AuditConfig::default()));
let config = ServerConfig::new().with_auth(auth_config);
let service = BlobServiceImpl::with_full_config(
Arc::clone(&blob_store),
&config,
None,
Some(audit_logger),
None,
);
let mut request = Request::new(BlobDeleteRequest { artifact_id });
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("audit-del-key-1234"),
);
let result = service.delete(request).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_all_components_together() {
use crate::audit::{AuditConfig, AuditLogger};
use crate::config::{ApiKey, AuthConfig};
use crate::metrics::ServerMetrics;
use crate::rate_limit::{RateLimitConfig, RateLimiter};
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use tonic::metadata::MetadataValue;
let blob_store = create_test_blob_store().await;
let artifact_id = {
let store = blob_store.lock().await;
store
.put(
"full_test.txt",
b"full test",
tensor_blob::PutOptions::default(),
)
.await
.unwrap()
};
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"full-key-1234567890".to_string(),
"user:full".to_string(),
));
let rate_limiter = Arc::new(RateLimiter::new(RateLimitConfig::default()));
let audit_logger = Arc::new(AuditLogger::new(AuditConfig::default()));
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let config = ServerConfig::new().with_auth(auth_config);
let service = BlobServiceImpl::with_full_config(
Arc::clone(&blob_store),
&config,
Some(rate_limiter),
Some(audit_logger),
Some(metrics),
);
let mut request = Request::new(BlobDownloadRequest { artifact_id });
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("full-key-1234567890"),
);
let result = service.download(request).await;
assert!(result.is_ok());
}
}