use std::collections::BTreeMap;
use std::sync::Arc;
use arrow::array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use crate::session::JammiSession;
use crate::tenant::TenantId;
use crate::trigger::topic::TopicDefinition;
use crate::trigger::TopicId;
use super::error::EphemeralError;
use super::event::{SessionLifecycleRecord, SESSION_LIFECYCLE_TOPIC};
fn payload_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"record",
DataType::Utf8,
false,
)]))
}
pub(super) async fn publish_lifecycle(
session: &JammiSession,
tenant: TenantId,
record: &SessionLifecycleRecord,
) -> Result<(), EphemeralError> {
let repo = session.topic_repo();
let topic = match repo
.lookup_by_name(SESSION_LIFECYCLE_TOPIC, Some(tenant))
.await
.map_err(|e| EphemeralError::Broker(e.to_string()))?
{
Some(existing) => existing,
None => {
let definition = TopicDefinition {
id: TopicId::new(),
name: SESSION_LIFECYCLE_TOPIC.to_string(),
schema: payload_schema(),
tenant: Some(tenant),
broker_metadata: BTreeMap::new(),
};
repo.register_topic(&definition)
.await
.map_err(|e| EphemeralError::Broker(e.to_string()))?;
definition
}
};
session
.trigger_broker()
.register_topic(&topic)
.await
.map_err(|e| EphemeralError::Broker(e.to_string()))?;
let payload = serde_json::to_string(record)?;
let batch = RecordBatch::try_new(
Arc::clone(&topic.schema),
vec![Arc::new(StringArray::from(vec![payload]))],
)
.map_err(|e| EphemeralError::Broker(format!("build lifecycle batch: {e}")))?;
session
.publisher()
.publish_scoped(&topic, Some(tenant), batch)
.await
.map_err(|e| EphemeralError::Broker(e.to_string()))?;
Ok(())
}