use std::sync::Arc;
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response};
use pgwire::error::PgWireResult;
use crate::control::security::identity::{AuthenticatedIdentity, Role};
use crate::control::server::pgwire::types::{int8_field, text_field};
use crate::control::state::SharedState;
use crate::types::{DatabaseId, TraceId};
pub async fn dropped_collections(
state: &SharedState,
identity: &AuthenticatedIdentity,
) -> PgWireResult<Vec<Response>> {
let schema = Arc::new(vec![
int8_field("tenant_id"),
text_field("name"),
text_field("owner"),
text_field("engine_type"),
int8_field("deactivated_at_ns"),
int8_field("retention_expires_at_ns"),
int8_field("size_bytes_estimate"),
]);
let Some(catalog) = state.credentials.catalog() else {
return Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(Vec::<Result<_, pgwire::error::PgWireError>>::new()),
))]);
};
let dropped = catalog
.load_dropped_collections(DatabaseId::DEFAULT)
.map_err(|e| pgwire::error::PgWireError::ApiError(Box::new(e)))?;
let retention = state
.retention_settings
.read()
.map(|r| r.retention_window())
.unwrap_or_else(|_| crate::config::server::RetentionSettings::default().retention_window());
let retention_ns = retention.as_nanos() as u64;
let is_admin = identity.is_superuser || identity.has_role(&Role::TenantAdmin);
let caller_tenant = identity.tenant_id.as_u64();
let mut rows = Vec::new();
let mut encoder = DataRowEncoder::new(schema.clone());
for coll in &dropped {
if !is_admin && coll.tenant_id != caller_tenant {
continue;
}
let deactivated_ns = coll.modification_hlc.wall_ns;
let expires_ns = deactivated_ns.saturating_add(retention_ns);
let engine_type = coll.collection_type.as_str();
let size_estimate = if coll.size_bytes_estimate > 0 {
coll.size_bytes_estimate
} else {
query_collection_size(state, coll.tenant_id, &coll.name)
.await
.unwrap_or(0)
};
encoder.encode_field(&(coll.tenant_id as i64))?;
encoder.encode_field(&coll.name.as_str())?;
encoder.encode_field(&coll.owner.as_str())?;
encoder.encode_field(&engine_type)?;
encoder.encode_field(&(deactivated_ns as i64))?;
encoder.encode_field(&(expires_ns as i64))?;
encoder.encode_field(&(size_estimate as i64))?;
rows.push(Ok(encoder.take_row()));
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
async fn query_collection_size(
state: &SharedState,
tenant_id: u64,
collection: &str,
) -> Option<u64> {
use crate::bridge::envelope::{PhysicalPlan, Priority, Request, Status};
use crate::bridge::physical_plan::MetaOp;
use crate::types::{DatabaseId, ReadConsistency, TenantId, VShardId};
let request_id = state.next_request_id();
let timeout = std::time::Duration::from_millis(500);
let request = Request {
request_id,
tenant_id: TenantId::new(tenant_id),
database_id: DatabaseId::DEFAULT,
vshard_id: VShardId::new(0),
plan: PhysicalPlan::Meta(MetaOp::QueryCollectionSize {
tenant_id,
name: collection.to_string(),
}),
deadline: std::time::Instant::now() + timeout,
priority: Priority::Background,
trace_id: TraceId::generate(),
consistency: ReadConsistency::Eventual,
idempotency_key: None,
event_source: crate::event::EventSource::User,
user_roles: Vec::new(),
user_id: None,
statement_digest: None,
};
let mut rx = state.tracker.register(request_id);
{
let mut d = state.dispatcher.lock().unwrap_or_else(|p| p.into_inner());
if d.dispatch_to_core(0, request).is_err() {
state.tracker.cancel(&request_id);
return None;
}
}
let resp = tokio::time::timeout(timeout, async { rx.recv().await.ok_or(()) })
.await
.ok()?
.ok()?;
if resp.status != Status::Ok {
return None;
}
let bytes = resp.payload.as_ref();
if bytes.len() < 8 {
return None;
}
Some(u64::from_le_bytes(bytes[..8].try_into().ok()?))
}