use std::sync::Arc;
use async_trait::async_trait;
use modkit_db::DBProvider;
use modkit_db::outbox::{LeasedMessageHandler, MessageResult, OutboxMessage};
use modkit_security::SecurityContext;
use serde::Deserialize;
use tracing::{info, warn};
use crate::domain::ports::{FileStorageProvider, metric_labels};
type DbProvider = DBProvider<modkit_db::DbError>;
type AttachmentRepo = crate::infra::db::repo::attachment_repo::AttachmentRepository;
fn tenant_security_context(tenant_id: uuid::Uuid) -> SecurityContext {
#[allow(clippy::expect_used)]
SecurityContext::builder()
.subject_tenant_id(tenant_id)
.subject_id(modkit_security::constants::DEFAULT_SUBJECT_ID)
.build()
.expect("tenant SecurityContext must build with tenant_id + subject_id")
}
pub struct AttachmentCleanupHandler {
file_storage: Arc<dyn FileStorageProvider>,
attachment_repo: AttachmentRepo,
chat_repo: ChatRepo,
db: Arc<DbProvider>,
max_attempts: u32,
metrics: Arc<dyn crate::domain::ports::MiniChatMetricsPort>,
}
impl AttachmentCleanupHandler {
pub fn new(
file_storage: Arc<dyn FileStorageProvider>,
db: Arc<DbProvider>,
chat_repo: ChatRepo,
max_attempts: u32,
metrics: Arc<dyn crate::domain::ports::MiniChatMetricsPort>,
) -> Self {
Self {
file_storage,
attachment_repo: crate::infra::db::repo::attachment_repo::AttachmentRepository,
chat_repo,
db,
max_attempts,
metrics,
}
}
}
#[derive(Debug, Deserialize)]
struct AttachmentCleanupPayload {
#[allow(dead_code)]
event_type: String,
tenant_id: uuid::Uuid,
#[allow(dead_code)]
chat_id: uuid::Uuid,
attachment_id: uuid::Uuid,
provider_file_id: Option<String>,
storage_backend: String,
#[allow(dead_code)]
attachment_kind: String,
}
#[async_trait]
impl LeasedMessageHandler for AttachmentCleanupHandler {
#[tracing::instrument(name = "worker", skip_all, fields(worker = "attachment_cleanup"))]
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
let event: AttachmentCleanupPayload = match serde_json::from_slice(&msg.payload) {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "attachment cleanup: invalid payload");
return MessageResult::Reject(format!("invalid payload: {e}"));
}
};
tracing::debug!(
attachment_id = %event.attachment_id,
storage_backend = %event.storage_backend,
has_provider_file = event.provider_file_id.is_some(),
"attachment cleanup: processing"
);
{
use crate::domain::repos::ChatRepository as _;
let conn = match self.db.conn() {
Ok(c) => c,
Err(e) => {
warn!(error = %e, "attachment cleanup: db conn failed");
return MessageResult::Retry;
}
};
match self.chat_repo.is_deleted_system(&conn, event.chat_id).await {
Ok(true) => {
tracing::debug!(
attachment_id = %event.attachment_id,
chat_id = %event.chat_id,
"attachment cleanup: parent chat soft-deleted - ownership transferred, acking"
);
return MessageResult::Ok;
}
Ok(false) => {} Err(e) => {
warn!(error = %e, "attachment cleanup: db error checking chat");
return MessageResult::Retry;
}
}
}
let Some(ref provider_file_id) = event.provider_file_id else {
tracing::debug!(attachment_id = %event.attachment_id, "attachment cleanup: no provider file - marking done");
if let Err(e) = self.mark_done(event.attachment_id).await {
warn!(attachment_id = %event.attachment_id, error = %e, "attachment cleanup: failed to mark done");
return MessageResult::Retry;
}
return MessageResult::Ok;
};
let ctx = tenant_security_context(event.tenant_id);
if let Err(e) = self
.file_storage
.delete_file(ctx, &event.storage_backend, provider_file_id)
.await
{
warn!(
attachment_id = %event.attachment_id,
error = %e,
"attachment cleanup: provider delete failed"
);
return self
.record_failure(event.attachment_id, &e.to_string())
.await;
}
if let Err(e) = self.mark_done(event.attachment_id).await {
warn!(attachment_id = %event.attachment_id, error = %e, "attachment cleanup: failed to mark done after provider delete");
return MessageResult::Retry;
}
self.metrics
.record_cleanup_completed(metric_labels::resource_type::FILE);
info!(attachment_id = %event.attachment_id, "attachment cleanup: done");
MessageResult::Ok
}
}
impl AttachmentCleanupHandler {
async fn mark_done(
&self,
attachment_id: uuid::Uuid,
) -> Result<(), crate::domain::error::DomainError> {
use crate::domain::repos::AttachmentRepository as _;
let conn = self
.db
.conn()
.map_err(crate::domain::error::DomainError::from)?;
self.attachment_repo
.mark_cleanup_done(&conn, attachment_id)
.await?;
Ok(())
}
#[allow(clippy::cognitive_complexity)]
async fn record_failure(&self, attachment_id: uuid::Uuid, error: &str) -> MessageResult {
use crate::domain::repos::{AttachmentRepository as _, CleanupOutcome};
let conn = match self.db.conn() {
Ok(c) => c,
Err(e) => {
warn!(error = %e, "record_failure: db conn failed");
return MessageResult::Retry;
}
};
match self
.attachment_repo
.record_cleanup_attempt(&conn, attachment_id, error, self.max_attempts)
.await
{
Ok(CleanupOutcome::TerminalFailure) => {
warn!(attachment_id = %attachment_id, "attachment cleanup: max attempts reached -- terminal failure");
self.metrics
.record_cleanup_failed(metric_labels::resource_type::FILE);
MessageResult::Reject(format!("max attempts ({}) reached", self.max_attempts))
}
Ok(CleanupOutcome::AlreadyTerminal) => {
tracing::debug!(attachment_id = %attachment_id, "attachment cleanup: already terminal (stale redelivery)");
MessageResult::Ok
}
Ok(CleanupOutcome::StillPending) => {
self.metrics
.record_cleanup_retry(metric_labels::resource_type::FILE, error);
MessageResult::Retry
}
Err(e) => {
warn!(error = %e, "record_failure: db error recording attempt");
MessageResult::Retry
}
}
}
}
type ChatRepo = crate::infra::db::repo::chat_repo::ChatRepository;
type VectorStoreRepo = crate::infra::db::repo::vector_store_repo::VectorStoreRepository;
pub struct ChatCleanupHandler {
file_storage: Arc<dyn FileStorageProvider>,
vs_provider: Arc<dyn crate::domain::ports::VectorStoreProvider>,
attachment_repo: AttachmentRepo,
vector_store_repo: VectorStoreRepo,
chat_repo: ChatRepo,
db: Arc<DbProvider>,
max_attempts: u32,
metrics: Arc<dyn crate::domain::ports::MiniChatMetricsPort>,
}
impl ChatCleanupHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
file_storage: Arc<dyn FileStorageProvider>,
vs_provider: Arc<dyn crate::domain::ports::VectorStoreProvider>,
db: Arc<DbProvider>,
chat_repo: ChatRepo,
max_attempts: u32,
metrics: Arc<dyn crate::domain::ports::MiniChatMetricsPort>,
) -> Self {
Self {
file_storage,
vs_provider,
attachment_repo: crate::infra::db::repo::attachment_repo::AttachmentRepository,
vector_store_repo: crate::infra::db::repo::vector_store_repo::VectorStoreRepository,
chat_repo,
db,
max_attempts,
metrics,
}
}
}
#[derive(Debug, Deserialize)]
struct ChatCleanupPayload {
reason: crate::domain::repos::CleanupReason,
tenant_id: uuid::Uuid,
chat_id: uuid::Uuid,
#[allow(dead_code)]
system_request_id: uuid::Uuid,
}
#[async_trait]
impl LeasedMessageHandler for ChatCleanupHandler {
#[tracing::instrument(name = "worker", skip_all, fields(worker = "chat_cleanup"))]
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
use crate::domain::repos::{
AttachmentRepository as _, ChatRepository as _, VectorStoreRepository as _,
};
let event: ChatCleanupPayload = match serde_json::from_slice(&msg.payload) {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "chat cleanup: invalid payload");
return MessageResult::Reject(format!("invalid payload: {e}"));
}
};
let chat_id = event.chat_id;
let tenant_id = event.tenant_id;
tracing::debug!(chat_id = %chat_id, tenant_id = %tenant_id, reason = ?event.reason, "chat cleanup: processing");
let conn = match self.db.conn() {
Ok(c) => c,
Err(e) => {
warn!(error = %e, "chat cleanup: db conn failed");
return MessageResult::Retry;
}
};
match self.chat_repo.is_deleted_system(&conn, chat_id).await {
Ok(true) => {} Ok(false) => {
warn!(chat_id = %chat_id, "chat cleanup: chat is not soft-deleted -- rejecting");
return MessageResult::Reject("chat is not soft-deleted".to_owned());
}
Err(e) => {
warn!(chat_id = %chat_id, error = %e, "chat cleanup: db error checking chat");
return MessageResult::Retry;
}
}
let pending = match self
.attachment_repo
.find_pending_cleanup_by_chat(&conn, chat_id)
.await
{
Ok(p) => p,
Err(e) => {
warn!(chat_id = %chat_id, error = %e, "chat cleanup: db error loading attachments");
return MessageResult::Retry;
}
};
let mut any_still_pending = false;
for att in &pending {
if let Some(ref provider_file_id) = att.provider_file_id {
let ctx = tenant_security_context(event.tenant_id);
if let Err(e) = self
.file_storage
.delete_file(ctx, &att.storage_backend, provider_file_id)
.await
{
warn!(
chat_id = %chat_id,
attachment_id = %att.id,
error = %e,
"chat cleanup: provider file delete failed"
);
let error_str = e.to_string();
match self
.attachment_repo
.record_cleanup_attempt(&conn, att.id, &error_str, self.max_attempts)
.await
{
Ok(crate::domain::repos::CleanupOutcome::StillPending) => {
self.metrics.record_cleanup_retry(
metric_labels::resource_type::FILE,
&error_str,
);
any_still_pending = true;
}
Ok(crate::domain::repos::CleanupOutcome::TerminalFailure) => {
self.metrics
.record_cleanup_failed(metric_labels::resource_type::FILE);
warn!(chat_id = %chat_id, attachment_id = %att.id, "chat cleanup: attachment terminal failure");
}
Ok(crate::domain::repos::CleanupOutcome::AlreadyTerminal) => {
tracing::debug!(chat_id = %chat_id, attachment_id = %att.id, "chat cleanup: attachment already terminal (stale)");
}
Err(db_err) => {
warn!(chat_id = %chat_id, attachment_id = %att.id, error = %db_err, "chat cleanup: db error recording attempt");
any_still_pending = true;
}
}
continue;
}
}
if let Err(e) = self.attachment_repo.mark_cleanup_done(&conn, att.id).await {
warn!(chat_id = %chat_id, attachment_id = %att.id, error = %e, "chat cleanup: failed to mark done");
any_still_pending = true;
continue;
}
if att.provider_file_id.is_some() {
self.metrics
.record_cleanup_completed(metric_labels::resource_type::FILE);
}
tracing::debug!(chat_id = %chat_id, attachment_id = %att.id, "chat cleanup: attachment done");
}
if any_still_pending {
return MessageResult::Retry;
}
let vs_row = match self
.vector_store_repo
.find_by_chat_system(&conn, chat_id)
.await
{
Ok(vs) => vs,
Err(e) => {
warn!(chat_id = %chat_id, error = %e, "chat cleanup: db error loading vector store");
return MessageResult::Retry;
}
};
if let Some(vs_row) = vs_row {
match self
.attachment_repo
.find_pending_cleanup_by_chat(&conn, chat_id)
.await
{
Ok(still) if !still.is_empty() => {
return MessageResult::Retry;
}
Err(e) => {
warn!(chat_id = %chat_id, error = %e, "chat cleanup: db error re-checking attachments");
return MessageResult::Retry;
}
_ => {}
}
let failed_count = match self
.attachment_repo
.count_failed_cleanup_by_chat(&conn, chat_id)
.await
{
Ok(c) => c,
Err(e) => {
warn!(chat_id = %chat_id, error = %e, "chat cleanup: db error counting failed attachments");
return MessageResult::Retry;
}
};
if failed_count > 0 {
warn!(
chat_id = %chat_id,
failed_count,
"chat cleanup: deleting vector store with failed attachment cleanup"
);
self.metrics.record_cleanup_vs_with_failed_attachments();
}
if let Some(ref vs_id) = vs_row.vector_store_id {
let vs_ctx = tenant_security_context(event.tenant_id);
if let Err(e) = self
.vs_provider
.delete_vector_store(vs_ctx, &vs_row.provider, vs_id)
.await
{
let reason = format!("vector store delete failed: {e}");
warn!(chat_id = %chat_id, vector_store_id = vs_id, error = %e, "chat cleanup: vector store delete failed");
self.metrics
.record_cleanup_retry(metric_labels::resource_type::VECTOR_STORE, &reason);
return MessageResult::Retry;
}
info!(chat_id = %chat_id, vector_store_id = vs_id, "chat cleanup: vector store deleted on provider");
}
if let Err(e) = self.vector_store_repo.delete_system(&conn, vs_row.id).await {
warn!(chat_id = %chat_id, error = %e, "chat cleanup: failed to delete VS row");
return MessageResult::Retry;
}
if vs_row.vector_store_id.is_some() {
self.metrics
.record_cleanup_completed(metric_labels::resource_type::VECTOR_STORE);
}
info!(chat_id = %chat_id, "chat cleanup: vector store row removed");
}
info!(chat_id = %chat_id, "chat cleanup: complete");
MessageResult::Ok
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_msg() -> OutboxMessage {
OutboxMessage {
partition_id: 1,
seq: 1,
payload: b"{}".to_vec(),
payload_type: "application/json".to_owned(),
created_at: chrono::Utc::now(),
attempts: 0i16,
}
}
fn make_cleanup_payload(provider_file_id: Option<&str>) -> OutboxMessage {
let event = serde_json::json!({
"event_type": "attachment_deleted",
"tenant_id": "00000000-0000-0000-0000-000000000001",
"chat_id": "00000000-0000-0000-0000-000000000002",
"attachment_id": "00000000-0000-0000-0000-000000000003",
"provider_file_id": provider_file_id,
"vector_store_id": null,
"storage_backend": "openai",
"attachment_kind": "document",
"deleted_at": "2026-01-01T00:00:00Z"
});
OutboxMessage {
partition_id: 1,
seq: 1,
payload: serde_json::to_vec(&event).unwrap(),
payload_type: "application/json".to_owned(),
created_at: chrono::Utc::now(),
attempts: 0i16,
}
}
#[tokio::test]
async fn attachment_handler_rejects_invalid_payload() {
use crate::domain::service::test_helpers::inmem_db;
let db = inmem_db().await;
let db_provider = crate::domain::service::test_helpers::mock_db_provider(db);
let handler = AttachmentCleanupHandler::new(
Arc::new(crate::domain::service::test_helpers::NoopFileStorage),
db_provider,
crate::infra::db::repo::chat_repo::ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
}),
5,
Arc::new(crate::domain::ports::metrics::NoopMetrics),
);
let msg = make_msg(); let result = handler.handle(&msg).await;
assert!(
matches!(result, MessageResult::Reject(_)),
"invalid payload should be rejected"
);
}
#[tokio::test]
async fn attachment_handler_succeeds_no_provider_file() {
use crate::domain::service::test_helpers::inmem_db;
let db = inmem_db().await;
let db_provider = crate::domain::service::test_helpers::mock_db_provider(db);
let handler = AttachmentCleanupHandler::new(
Arc::new(crate::domain::service::test_helpers::NoopFileStorage),
db_provider,
crate::infra::db::repo::chat_repo::ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
}),
5,
Arc::new(crate::domain::ports::metrics::NoopMetrics),
);
let msg = make_cleanup_payload(None);
let result = handler.handle(&msg).await;
assert!(
matches!(result, MessageResult::Ok | MessageResult::Retry),
"no provider file should not reject"
);
}
#[tokio::test]
async fn deserialize_cleanup_payload() {
let msg = make_cleanup_payload(Some("file-abc123"));
let payload: AttachmentCleanupPayload =
serde_json::from_slice(&msg.payload).expect("deserialization should succeed");
assert_eq!(
payload.attachment_id.to_string(),
"00000000-0000-0000-0000-000000000003"
);
assert_eq!(payload.provider_file_id.as_deref(), Some("file-abc123"));
assert_eq!(payload.storage_backend, "openai");
}
fn make_chat_cleanup_payload(chat_id: uuid::Uuid) -> OutboxMessage {
let event = serde_json::json!({
"reason": "chat_soft_delete",
"tenant_id": uuid::Uuid::new_v4().to_string(),
"chat_id": chat_id.to_string(),
"system_request_id": uuid::Uuid::new_v4().to_string(),
"chat_deleted_at": "2026-01-01T00:00:00+00:00",
});
OutboxMessage {
partition_id: 1,
seq: 1,
payload: serde_json::to_vec(&event).unwrap(),
payload_type: "application/json".to_owned(),
created_at: chrono::Utc::now(),
attempts: 0i16,
}
}
fn build_chat_handler(db_provider: Arc<DbProvider>) -> ChatCleanupHandler {
use crate::domain::service::test_helpers::{NoopFileStorage, NoopVectorStoreProvider};
ChatCleanupHandler::new(
Arc::new(NoopFileStorage),
Arc::new(NoopVectorStoreProvider),
db_provider,
crate::infra::db::repo::chat_repo::ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
}),
5,
Arc::new(crate::domain::ports::metrics::NoopMetrics),
)
}
#[tokio::test]
async fn chat_cleanup_rejects_invalid_payload() {
use crate::domain::service::test_helpers::inmem_db;
let db = inmem_db().await;
let handler =
build_chat_handler(crate::domain::service::test_helpers::mock_db_provider(db));
let msg = make_msg(); let result = handler.handle(&msg).await;
assert!(
matches!(result, MessageResult::Reject(_)),
"invalid payload should be rejected"
);
}
#[tokio::test]
async fn chat_cleanup_rejects_active_chat() {
use crate::domain::service::test_helpers::{inmem_db, mock_db_provider};
let db = inmem_db().await;
let handler = build_chat_handler(mock_db_provider(db));
let msg = make_chat_cleanup_payload(uuid::Uuid::new_v4());
let result = handler.handle(&msg).await;
assert!(
matches!(result, MessageResult::Reject(_)),
"active/non-existent chat should be rejected"
);
}
#[tokio::test]
async fn chat_cleanup_succeeds_empty_chat() {
use crate::domain::repos::ChatRepository as _;
use crate::domain::service::test_helpers::{inmem_db, mock_db_provider};
let db = inmem_db().await;
let db_provider = mock_db_provider(db.clone());
let chat_repo =
crate::infra::db::repo::chat_repo::ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
});
let tenant_id = uuid::Uuid::new_v4();
let user_id = uuid::Uuid::new_v4();
let chat_id = uuid::Uuid::new_v4();
let scope = modkit_security::AccessScope::allow_all();
let conn = db_provider.conn().unwrap();
let chat = crate::domain::models::Chat {
id: chat_id,
tenant_id,
user_id,
model: "test-model".to_owned(),
title: Some("test".to_owned()),
is_temporary: false,
created_at: time::OffsetDateTime::now_utc(),
updated_at: time::OffsetDateTime::now_utc(),
};
chat_repo.create(&conn, &scope, chat).await.unwrap();
chat_repo.soft_delete(&conn, &scope, chat_id).await.unwrap();
let handler = build_chat_handler(db_provider);
let msg = make_chat_cleanup_payload(chat_id);
let result = handler.handle(&msg).await;
assert!(
matches!(result, MessageResult::Ok),
"empty soft-deleted chat should succeed, got: {result:?}"
);
}
#[tokio::test]
async fn deserialize_chat_cleanup_payload() {
let chat_id = uuid::Uuid::new_v4();
let msg = make_chat_cleanup_payload(chat_id);
let payload: ChatCleanupPayload =
serde_json::from_slice(&msg.payload).expect("deserialization should succeed");
assert_eq!(payload.chat_id, chat_id);
assert_eq!(
payload.reason,
crate::domain::repos::CleanupReason::ChatSoftDelete
);
}
async fn seed_pending_attachment(
db: &Arc<DbProvider>,
chat_id: uuid::Uuid,
tenant_id: uuid::Uuid,
provider_file_id: Option<&str>,
) -> uuid::Uuid {
use crate::domain::repos::{AttachmentRepository as _, InsertAttachmentParams};
let repo = crate::infra::db::repo::attachment_repo::AttachmentRepository;
let scope = modkit_security::AccessScope::allow_all();
let conn = db.conn().unwrap();
let att_id = uuid::Uuid::new_v4();
repo.insert(
&conn,
&scope,
InsertAttachmentParams {
id: att_id,
tenant_id,
chat_id,
uploaded_by_user_id: uuid::Uuid::new_v4(),
filename: "test.txt".to_owned(),
content_type: "text/plain".to_owned(),
size_bytes: 100,
storage_backend: "openai".to_owned(),
attachment_kind: "document".to_owned(),
for_file_search: false,
for_code_interpreter: false,
},
)
.await
.expect("insert attachment");
if let Some(pfid) = provider_file_id {
use crate::domain::repos::SetUploadedParams;
repo.cas_set_uploaded(
&conn,
&scope,
SetUploadedParams {
id: att_id,
provider_file_id: pfid.to_owned(),
size_bytes: 100,
},
)
.await
.expect("set uploaded");
}
repo.mark_attachments_pending_for_chat(&conn, chat_id)
.await
.expect("mark pending");
att_id
}
async fn seed_deleted_chat(db: &Arc<DbProvider>) -> (uuid::Uuid, uuid::Uuid) {
use crate::domain::repos::ChatRepository as _;
let chat_repo =
crate::infra::db::repo::chat_repo::ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
});
let tenant_id = uuid::Uuid::new_v4();
let chat_id = uuid::Uuid::new_v4();
let scope = modkit_security::AccessScope::allow_all();
let conn = db.conn().unwrap();
let chat = crate::domain::models::Chat {
id: chat_id,
tenant_id,
user_id: uuid::Uuid::new_v4(),
model: "test-model".to_owned(),
title: Some("test".to_owned()),
is_temporary: false,
created_at: time::OffsetDateTime::now_utc(),
updated_at: time::OffsetDateTime::now_utc(),
};
chat_repo.create(&conn, &scope, chat).await.unwrap();
chat_repo.soft_delete(&conn, &scope, chat_id).await.unwrap();
(chat_id, tenant_id)
}
#[tokio::test]
async fn chat_cleanup_processes_pending_attachment_success() {
use crate::domain::repos::AttachmentRepository as _;
use crate::domain::service::test_helpers::inmem_db;
let db = inmem_db().await;
let db_provider = crate::domain::service::test_helpers::mock_db_provider(db.clone());
let (chat_id, tenant_id) = seed_deleted_chat(&db_provider).await;
seed_pending_attachment(&db_provider, chat_id, tenant_id, Some("file-123")).await;
let handler = build_chat_handler(Arc::clone(&db_provider));
let msg = make_chat_cleanup_payload(chat_id);
let result = handler.handle(&msg).await;
assert!(
matches!(result, MessageResult::Ok),
"should succeed with NoopFileStorage, got: {result:?}"
);
let conn = db_provider.conn().unwrap();
let repo = crate::infra::db::repo::attachment_repo::AttachmentRepository;
let pending = repo
.find_pending_cleanup_by_chat(&conn, chat_id)
.await
.unwrap();
assert!(pending.is_empty(), "no attachments should remain pending");
}
#[tokio::test]
async fn chat_cleanup_retries_on_provider_failure() {
use crate::domain::repos::AttachmentRepository as _;
use crate::domain::service::test_helpers::{
FailingFileStorage, NoopVectorStoreProvider, inmem_db,
};
let db = inmem_db().await;
let db_provider = crate::domain::service::test_helpers::mock_db_provider(db.clone());
let (chat_id, tenant_id) = seed_deleted_chat(&db_provider).await;
seed_pending_attachment(&db_provider, chat_id, tenant_id, Some("file-456")).await;
let handler = ChatCleanupHandler::new(
Arc::new(FailingFileStorage),
Arc::new(NoopVectorStoreProvider),
Arc::clone(&db_provider),
crate::infra::db::repo::chat_repo::ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
}),
5, Arc::new(crate::domain::ports::metrics::NoopMetrics),
);
let msg = make_chat_cleanup_payload(chat_id);
let result = handler.handle(&msg).await;
assert!(
matches!(result, MessageResult::Retry),
"should retry on provider failure, got: {result:?}"
);
let conn = db_provider.conn().unwrap();
let repo = crate::infra::db::repo::attachment_repo::AttachmentRepository;
let pending = repo
.find_pending_cleanup_by_chat(&conn, chat_id)
.await
.unwrap();
assert_eq!(pending.len(), 1, "attachment should still be pending");
assert_eq!(
pending[0].cleanup_attempts, 1,
"attempts should be incremented"
);
assert!(
pending[0].last_cleanup_error.is_some(),
"error should be recorded"
);
}
#[tokio::test]
async fn chat_cleanup_terminal_failure_at_max_attempts() {
use crate::domain::repos::AttachmentRepository as _;
use crate::domain::service::test_helpers::{
FailingFileStorage, NoopVectorStoreProvider, inmem_db,
};
let db = inmem_db().await;
let db_provider = crate::domain::service::test_helpers::mock_db_provider(db.clone());
let (chat_id, tenant_id) = seed_deleted_chat(&db_provider).await;
seed_pending_attachment(&db_provider, chat_id, tenant_id, Some("file-789")).await;
let handler = ChatCleanupHandler::new(
Arc::new(FailingFileStorage),
Arc::new(NoopVectorStoreProvider),
Arc::clone(&db_provider),
crate::infra::db::repo::chat_repo::ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
}),
1, Arc::new(crate::domain::ports::metrics::NoopMetrics),
);
let msg = make_chat_cleanup_payload(chat_id);
let result = handler.handle(&msg).await;
assert!(
matches!(result, MessageResult::Ok),
"all attachments terminal -> should succeed, got: {result:?}"
);
let conn = db_provider.conn().unwrap();
let repo = crate::infra::db::repo::attachment_repo::AttachmentRepository;
let pending = repo
.find_pending_cleanup_by_chat(&conn, chat_id)
.await
.unwrap();
assert!(
pending.is_empty(),
"no pending attachments -- the one we had should be 'failed'"
);
let failed = repo
.count_failed_cleanup_by_chat(&conn, chat_id)
.await
.unwrap();
assert_eq!(
failed, 1,
"one attachment should be in terminal failed state"
);
}
}