use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
use chrono::{DateTime, Utc};
use uuid::Uuid;
use crate::catalog::backend::TxOptions;
use crate::session::JammiSession;
use crate::store::mutable::definition::{MutableTableDefinitionBuilder, MutableTableId};
use crate::tenant::TenantId;
use super::error::EphemeralError;
use super::event::{SessionLifecycleEvent, SessionLifecycleRecord};
use super::scanner::ActiveSessions;
use super::topic;
const EPHEMERAL_PREFIX: &str = "__eph_";
const RESERVED_ID_CHARS: usize = EPHEMERAL_PREFIX.len() + 32 + 1;
pub const MAX_LOGICAL_NAME_LEN: usize = 63 - RESERVED_ID_CHARS;
pub struct EphemeralSession {
parent: Arc<JammiSession>,
session_id: Uuid,
tenant: TenantId,
created_at: DateTime<Utc>,
timeout: Duration,
tables: BTreeMap<String, MutableTableId>,
closed: bool,
active: ActiveSessions,
}
impl EphemeralSession {
pub async fn open(
parent: Arc<JammiSession>,
timeout: Duration,
active: ActiveSessions,
) -> Result<Self, EphemeralError> {
let tenant = parent.tenant().ok_or(EphemeralError::NoTenantBinding)?;
let session = Self {
parent,
session_id: Uuid::new_v4(),
tenant,
created_at: Utc::now(),
timeout,
tables: BTreeMap::new(),
closed: false,
active,
};
session.register_snapshot();
session.emit(SessionLifecycleEvent::Opened, 0, None).await?;
Ok(session)
}
fn register_snapshot(&self) {
let tables: Vec<MutableTableId> = self.tables.values().cloned().collect();
self.active
.upsert(self.session_id, self.tenant, self.deadline(), tables);
}
fn deadline(&self) -> DateTime<Utc> {
match chrono::Duration::from_std(self.timeout) {
Ok(d) => self.created_at + d,
Err(_) => DateTime::<Utc>::MAX_UTC,
}
}
pub fn session_id(&self) -> Uuid {
self.session_id
}
pub fn tenant(&self) -> TenantId {
self.tenant
}
pub fn created_at(&self) -> DateTime<Utc> {
self.created_at
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
now >= self.deadline()
}
pub fn physical_table_id(&self, name: &str) -> Option<&MutableTableId> {
self.tables.get(name)
}
fn physical_id(&self, name: &str) -> Result<MutableTableId, EphemeralError> {
if name.len() > MAX_LOGICAL_NAME_LEN {
return Err(EphemeralError::NameTooLong {
name: name.to_string(),
max: MAX_LOGICAL_NAME_LEN,
});
}
let raw = format!("{EPHEMERAL_PREFIX}{}_{name}", self.session_id.as_simple());
MutableTableId::new(raw).map_err(|e| EphemeralError::Storage(e.to_string()))
}
pub async fn create_ephemeral_table(
&mut self,
name: &str,
schema: SchemaRef,
primary_key: Vec<String>,
) -> Result<(), EphemeralError> {
if self.tables.contains_key(name) {
return Err(EphemeralError::DuplicateTable(name.to_string()));
}
let id = self.physical_id(name)?;
let def = MutableTableDefinitionBuilder::new(id.clone(), schema)
.primary_key(primary_key)
.tenant(Some(self.tenant))
.build()
.map_err(|e| EphemeralError::Storage(e.to_string()))?;
self.parent
.create_mutable_table(def)
.await
.map_err(|e| EphemeralError::Storage(e.to_string()))?;
self.tables.insert(name.to_string(), id);
self.register_snapshot();
Ok(())
}
pub async fn insert(&self, name: &str, batch: RecordBatch) -> Result<u64, EphemeralError> {
let id = self
.tables
.get(name)
.ok_or_else(|| EphemeralError::UnknownTable(name.to_string()))?
.clone();
let registry = self.parent.mutable_tables_arc();
let backend = self.parent.catalog().backend_arc();
let tenant = self.tenant;
backend
.transaction(TxOptions::default(), move |tx| {
Box::pin(async move {
tx.set_tenant(Some(tenant));
let rows = registry.insert_batch(tx, &id, &batch).await.map_err(|e| {
crate::catalog::backend::BackendError::Execution(e.to_string())
})?;
Ok(rows)
})
})
.await
.map_err(|e| EphemeralError::Storage(e.to_string()))
}
pub async fn sql(&self, name: &str, query: &str) -> Result<Vec<RecordBatch>, EphemeralError> {
let table_ref = self.table_ref(name)?;
let resolved = query.replace("{table}", &table_ref);
self.scoped_sql(&resolved).await
}
async fn scoped_sql(&self, sql: &str) -> Result<Vec<RecordBatch>, EphemeralError> {
let sql = sql.to_string();
self.parent
.with_tenant_scoped(
self.tenant,
move |scope| async move { scope.sql(&sql).await },
)
.await
.map_err(|e| EphemeralError::Storage(e.to_string()))
}
pub fn table_ref(&self, name: &str) -> Result<String, EphemeralError> {
let id = self
.tables
.get(name)
.ok_or_else(|| EphemeralError::UnknownTable(name.to_string()))?;
Ok(format!("mutable.public.\"{}\"", id.as_str()))
}
pub async fn count_rows(&self, name: &str) -> Result<u64, EphemeralError> {
let id = self
.tables
.get(name)
.ok_or_else(|| EphemeralError::UnknownTable(name.to_string()))?;
self.count_physical_rows(id).await
}
pub async fn close(mut self) -> Result<(), EphemeralError> {
self.delete_all(SessionLifecycleEvent::Closed).await
}
pub async fn force_close(mut self) -> Result<(), EphemeralError> {
self.delete_all(SessionLifecycleEvent::TimedOut).await
}
async fn delete_all(&mut self, event: SessionLifecycleEvent) -> Result<(), EphemeralError> {
let tables: Vec<MutableTableId> = std::mem::take(&mut self.tables).into_values().collect();
self.closed = true;
if !self.active.remove(&self.session_id) {
return Ok(());
}
let total = tables.len();
let outcome = super::scanner::delete_tables_and_emit(
&self.parent,
self.session_id,
self.tenant,
&tables,
event,
)
.await?;
if outcome.failures.is_empty() {
Ok(())
} else {
Err(EphemeralError::PartialDeletionFailure {
session_id: self.session_id,
failed: outcome.failures.len(),
total,
})
}
}
async fn count_physical_rows(&self, id: &MutableTableId) -> Result<u64, EphemeralError> {
let sql = format!(
"SELECT COUNT(*) AS n FROM mutable.public.\"{}\"",
id.as_str()
);
let batches = self.scoped_sql(&sql).await?;
let count = batches
.first()
.and_then(|b| b.column_by_name("n"))
.and_then(|c| c.as_any().downcast_ref::<arrow::array::Int64Array>())
.filter(|a| !a.is_empty())
.map(|a| a.value(0))
.unwrap_or(0);
Ok(count.max(0) as u64)
}
async fn emit(
&self,
event: SessionLifecycleEvent,
deleted_row_count: u64,
details: Option<serde_json::Value>,
) -> Result<(), EphemeralError> {
let record = SessionLifecycleRecord {
session_id: self.session_id,
tenant_id: self.tenant.to_string(),
event,
occurred_at: Utc::now(),
ephemeral_table_count: self.tables.len(),
deleted_row_count,
details,
};
topic::publish_lifecycle(&self.parent, self.tenant, &record).await
}
}
impl Drop for EphemeralSession {
fn drop(&mut self) {
if self.closed || self.tables.is_empty() {
return;
}
if !self.active.remove(&self.session_id) {
return;
}
let parent = Arc::clone(&self.parent);
let tenant = self.tenant;
let session_id = self.session_id;
let tables: Vec<MutableTableId> = self.tables.values().cloned().collect();
let Ok(handle) = tokio::runtime::Handle::try_current() else {
tracing::warn!(
session_id = %session_id,
"ephemeral session dropped outside a tokio runtime; \
{} table(s) not deleted — use close() for guaranteed cleanup",
tables.len(),
);
return;
};
handle.spawn(async move {
if let Err(e) = super::scanner::delete_tables_and_emit(
&parent,
session_id,
tenant,
&tables,
SessionLifecycleEvent::Closed,
)
.await
{
tracing::warn!(
session_id = %session_id,
error = %e,
"ephemeral best-effort Drop cleanup completed with errors",
);
}
});
}
}