use car_meeting::{
list_meetings as list_meetings_fs, read_meeting, summarize_meeting, MeetingOptions,
MeetingPaths, MeetingRegistry, MeetingSource,
};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Deserialize)]
pub struct StartMeetingRequest {
#[serde(default)]
pub id: Option<String>,
pub sources: Vec<MeetingSource>,
#[serde(default)]
pub title: Option<String>,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub language: Option<String>,
#[serde(default)]
pub persist_audio: bool,
#[serde(default)]
pub root: Option<PathBuf>,
#[serde(default)]
pub streaming: bool,
#[serde(default)]
pub diarizer: bool,
#[serde(default)]
pub enrolled: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct StartMeetingResponse {
pub id: String,
pub title: String,
pub started_at: String,
pub voice_session_ids: Vec<String>,
}
pub async fn start_meeting(
request_json: &str,
meeting_registry: Arc<MeetingRegistry>,
voice_registry: Arc<car_voice::VoiceSessionRegistry>,
upstream_sink: Arc<dyn car_voice::VoiceEventSink>,
memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
cwd: Option<PathBuf>,
) -> Result<String, String> {
let req: StartMeetingRequest =
serde_json::from_str(request_json).map_err(|e| format!("invalid request JSON: {}", e))?;
if req.sources.is_empty() {
return Err("sources must contain at least one of: mic, system".to_string());
}
let id = req
.id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
let chained_upstream: Arc<dyn car_voice::VoiceEventSink> = match memgine {
Some(engine) => Arc::new(MeetingMemgineFanout {
meeting_id: id.clone(),
engine,
upstream: upstream_sink,
}),
None => upstream_sink,
};
let options = MeetingOptions {
sources: req.sources,
root: req.root,
persist_audio: req.persist_audio,
model: req.model,
language: req.language,
title: req.title,
streaming: req.streaming,
diarizer: req.diarizer,
enrolled: req.enrolled,
};
let diarizer = if req.diarizer {
car_ffi_common::voice::current_prepared_diarizer()
} else {
None
};
let speaker_pipeline = if req.enrolled {
match car_ffi_common::voice::current_or_build_pipeline() {
Ok(p) => Some(p),
Err(e) => {
tracing::warn!(
"[meeting] enrolled requested but pipeline build failed: {} — proceeding without",
e
);
None
}
}
} else {
None
};
let handle = meeting_registry
.start(
id,
options,
voice_registry,
chained_upstream,
cwd.as_deref(),
diarizer,
speaker_pipeline,
)
.await
.map_err(|e| format!("start_meeting: {}", e))?;
let resp = StartMeetingResponse {
id: handle.metadata.id.clone(),
title: handle.metadata.title.clone(),
started_at: handle.metadata.started_at.to_rfc3339(),
voice_session_ids: handle.voice_session_ids.clone(),
};
serde_json::to_string(&resp).map_err(|e| e.to_string())
}
pub async fn stop_meeting(
meeting_id: &str,
summarize: bool,
meeting_registry: Arc<MeetingRegistry>,
voice_registry: Arc<car_voice::VoiceSessionRegistry>,
inference: Option<Arc<car_inference::InferenceEngine>>,
) -> Result<String, String> {
let mut handle = meeting_registry
.stop(&meeting_id.to_string(), voice_registry)
.await
.map_err(|e| format!("stop_meeting: {}", e))?;
if summarize {
if let Some(inference) = inference {
match read_meeting(&handle.paths) {
Ok(transcript) => {
match summarize_meeting(&transcript, inference, handle.metadata.model.clone())
.await
{
Ok(summary) => {
handle.metadata.summary = Some(summary);
if let Err(e) = car_meeting::persist_meeting_metadata(
&handle.paths,
&handle.metadata,
) {
tracing::warn!(
meeting_id = %meeting_id,
error = %e,
"failed to persist meeting summary"
);
}
}
Err(e) => {
tracing::warn!(
meeting_id = %meeting_id,
error = %e,
"summary generation failed; meeting still saved"
);
}
}
}
Err(e) => {
tracing::warn!(
meeting_id = %meeting_id,
error = %e,
"could not re-read transcript for summary"
);
}
}
}
}
serde_json::to_string(&handle.metadata).map_err(|e| e.to_string())
}
pub fn list_meetings(
root_override: Option<PathBuf>,
cwd: Option<PathBuf>,
) -> Result<String, String> {
let root = car_meeting::resolve_meeting_root(root_override.as_deref(), cwd.as_deref());
let ids = list_meetings_fs(&root).map_err(|e| e.to_string())?;
serde_json::to_string(&serde_json::json!({ "root": root, "ids": ids }))
.map_err(|e| e.to_string())
}
pub fn get_meeting(
meeting_id: &str,
root_override: Option<PathBuf>,
cwd: Option<PathBuf>,
) -> Result<String, String> {
let root = car_meeting::resolve_meeting_root(root_override.as_deref(), cwd.as_deref());
let paths = MeetingPaths::for_meeting(root, &meeting_id.to_string());
let transcript = read_meeting(&paths).map_err(|e| e.to_string())?;
serde_json::to_string(&transcript).map_err(|e| e.to_string())
}
struct MeetingMemgineFanout {
meeting_id: String,
engine: Arc<Mutex<car_memgine::MemgineEngine>>,
upstream: Arc<dyn car_voice::VoiceEventSink>,
}
impl car_voice::VoiceEventSink for MeetingMemgineFanout {
fn send(&self, voice_session_id: &str, event_json: String) {
if let Ok(value) = serde_json::from_str::<serde_json::Value>(&event_json) {
if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
&value,
&self.meeting_id,
voice_session_id,
) {
if let Ok(mut engine) = self.engine.lock() {
engine.ingest_conversation(&speaker, &text, chrono::Utc::now());
}
}
}
self.upstream.send(voice_session_id, event_json);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rejects_empty_sources() {
let req = serde_json::json!({"sources": []}).to_string();
let registry = Arc::new(MeetingRegistry::new());
let voice = Arc::new(car_voice::VoiceSessionRegistry::new());
let upstream: Arc<dyn car_voice::VoiceEventSink> =
Arc::new(NoopSink);
let res = futures_block(start_meeting(&req, registry, voice, upstream, None, None));
assert!(res.is_err());
assert!(res.unwrap_err().contains("sources"));
}
fn futures_block<F: std::future::Future>(f: F) -> F::Output {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(f)
}
struct NoopSink;
impl car_voice::VoiceEventSink for NoopSink {
fn send(&self, _sid: &str, _json: String) {}
}
}