car-server-core 0.9.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! JSON wrappers for the meeting capability surface.
//!
//! Each FFI binding (NAPI, PyO3, server) provides a process-wide
//! [`car_meeting::MeetingRegistry`] (typically behind a `OnceLock`)
//! plus a [`car_voice::VoiceEventSink`] that fans events to its own
//! consumer. These wrappers parse the JSON inputs, stitch the meeting
//! sink chain together (ingest → persist → upstream UI), and hand
//! control to the registry.
//!
//! ## Wire shapes
//!
//! `startMeeting(meetingOptionsJson)`:
//! ```jsonc
//! {
//!   "id": "meet-2025-04-26",          // optional, auto-generated when missing
//!   "sources": ["mic", "system"],
//!   "title": "Pricing review",        // optional
//!   "model": "large-v3-turbo-q5_0",   // optional
//!   "language": "en",                 // optional
//!   "persist_audio": false,           // default false (Q2)
//!   "root": "/abs/.car/meetings"      // optional override
//! }
//! ```
//! Returns: `{"id":"...","title":"...","started_at":"..."}`.
//!
//! `stopMeeting(meetingId, summarize)`:
//! Returns the persisted [`MeetingMetadata`] JSON, with `summary`
//! filled in iff `summarize=true` and inference succeeds.

use car_meeting::{
    list_meetings as list_meetings_fs, read_meeting, summarize_meeting, MeetingOptions,
    MeetingPaths, MeetingRegistry, MeetingSource,
};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

/// Parsed `startMeeting` request.
#[derive(Debug, Clone, Deserialize)]
pub struct StartMeetingRequest {
    /// Caller-chosen id. Auto-generated UUID when missing.
    #[serde(default)]
    pub id: Option<String>,
    /// Source list — must be non-empty. `mic` and/or `system`.
    pub sources: Vec<MeetingSource>,
    #[serde(default)]
    pub title: Option<String>,
    #[serde(default)]
    pub model: Option<String>,
    #[serde(default)]
    pub language: Option<String>,
    #[serde(default)]
    pub persist_audio: bool,
    #[serde(default)]
    pub root: Option<PathBuf>,
    /// Enable native streaming partials on the mic source. Effective
    /// only when the runtime was built with the `parakeet` cargo
    /// feature passed through to car-meeting; silently ignored
    /// otherwise.
    #[serde(default)]
    pub streaming: bool,
    /// Attach the prepared diarizer (call `prepareDiarizer` first)
    /// to the mic source so transcripts carry per-speaker roles.
    #[serde(default)]
    pub diarizer: bool,
    /// Attach the enrollment-based pipeline so segments matching an
    /// enrolled voiceprint get `role: "enrolled_user"`.
    #[serde(default)]
    pub enrolled: bool,
}

#[derive(Debug, Clone, Serialize)]
pub struct StartMeetingResponse {
    pub id: String,
    pub title: String,
    pub started_at: String,
    pub voice_session_ids: Vec<String>,
}

/// Start a meeting. The caller passes a memgine engine handle so
/// transcript segments are auto-ingested into car-memgine. Pass `None`
/// to skip memgine ingest entirely (transcripts still persist to disk).
pub async fn start_meeting(
    request_json: &str,
    meeting_registry: Arc<MeetingRegistry>,
    voice_registry: Arc<car_voice::VoiceSessionRegistry>,
    upstream_sink: Arc<dyn car_voice::VoiceEventSink>,
    memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
    cwd: Option<PathBuf>,
) -> Result<String, String> {
    let req: StartMeetingRequest =
        serde_json::from_str(request_json).map_err(|e| format!("invalid request JSON: {}", e))?;
    if req.sources.is_empty() {
        return Err("sources must contain at least one of: mic, system".to_string());
    }

    let id = req
        .id
        .clone()
        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());

    // Build the sink chain. If memgine is provided, wrap each source's
    // upstream sink with a per-source MemgineIngestSink so segments
    // tagged with the right source flow into the graph as
    // `meeting/<id>/<source>` Conversation nodes.
    //
    // The MeetingRegistry constructs MeetingSink internally and gives
    // it the chained upstream we hand it here.
    let chained_upstream: Arc<dyn car_voice::VoiceEventSink> = match memgine {
        Some(engine) => Arc::new(MeetingMemgineFanout {
            meeting_id: id.clone(),
            engine,
            upstream: upstream_sink,
        }),
        None => upstream_sink,
    };

    let options = MeetingOptions {
        sources: req.sources,
        root: req.root,
        persist_audio: req.persist_audio,
        model: req.model,
        language: req.language,
        title: req.title,
        streaming: req.streaming,
        diarizer: req.diarizer,
        enrolled: req.enrolled,
    };

    // Diarizer + enrolled speaker pipeline. car-server-core
    // unconditionally enables car-meeting's `diarization` feature
    // (per Cargo.toml), so the gates the FFI-side meeting helper
    // needed (its own `diarization` feature) come off here — the
    // car-meeting call-site parameter is always present.
    let diarizer = if req.diarizer {
        car_ffi_common::voice::current_prepared_diarizer()
    } else {
        None
    };
    let speaker_pipeline = if req.enrolled {
        match car_ffi_common::voice::current_or_build_pipeline() {
            Ok(p) => Some(p),
            Err(e) => {
                tracing::warn!(
                    "[meeting] enrolled requested but pipeline build failed: {} — proceeding without",
                    e
                );
                None
            }
        }
    } else {
        None
    };
    let handle = meeting_registry
        .start(
            id,
            options,
            voice_registry,
            chained_upstream,
            cwd.as_deref(),
            diarizer,
            speaker_pipeline,
        )
        .await
        .map_err(|e| format!("start_meeting: {}", e))?;

    let resp = StartMeetingResponse {
        id: handle.metadata.id.clone(),
        title: handle.metadata.title.clone(),
        started_at: handle.metadata.started_at.to_rfc3339(),
        voice_session_ids: handle.voice_session_ids.clone(),
    };
    serde_json::to_string(&resp).map_err(|e| e.to_string())
}

/// Stop a meeting. If `summarize=true`, runs the post-meeting summary
/// against the routed inference engine and persists it into
/// `meeting.json`.
pub async fn stop_meeting(
    meeting_id: &str,
    summarize: bool,
    meeting_registry: Arc<MeetingRegistry>,
    voice_registry: Arc<car_voice::VoiceSessionRegistry>,
    inference: Option<Arc<car_inference::InferenceEngine>>,
) -> Result<String, String> {
    let mut handle = meeting_registry
        .stop(&meeting_id.to_string(), voice_registry)
        .await
        .map_err(|e| format!("stop_meeting: {}", e))?;

    if summarize {
        if let Some(inference) = inference {
            // Re-read the transcript fresh from disk so the summary
            // sees every persisted segment, including any that
            // arrived between the registry's drain task and the
            // metadata rewrite.
            match read_meeting(&handle.paths) {
                Ok(transcript) => {
                    match summarize_meeting(&transcript, inference, handle.metadata.model.clone())
                        .await
                    {
                        Ok(summary) => {
                            handle.metadata.summary = Some(summary);
                            // Persist the updated metadata with summary.
                            if let Err(e) = car_meeting::persist_meeting_metadata(
                                &handle.paths,
                                &handle.metadata,
                            ) {
                                tracing::warn!(
                                    meeting_id = %meeting_id,
                                    error = %e,
                                    "failed to persist meeting summary"
                                );
                            }
                        }
                        Err(e) => {
                            tracing::warn!(
                                meeting_id = %meeting_id,
                                error = %e,
                                "summary generation failed; meeting still saved"
                            );
                        }
                    }
                }
                Err(e) => {
                    tracing::warn!(
                        meeting_id = %meeting_id,
                        error = %e,
                        "could not re-read transcript for summary"
                    );
                }
            }
        }
    }

    serde_json::to_string(&handle.metadata).map_err(|e| e.to_string())
}

/// List meeting ids that exist on disk under the resolved root.
pub fn list_meetings(
    root_override: Option<PathBuf>,
    cwd: Option<PathBuf>,
) -> Result<String, String> {
    let root = car_meeting::resolve_meeting_root(root_override.as_deref(), cwd.as_deref());
    let ids = list_meetings_fs(&root).map_err(|e| e.to_string())?;
    serde_json::to_string(&serde_json::json!({ "root": root, "ids": ids }))
        .map_err(|e| e.to_string())
}

/// Read a meeting's full metadata + transcript from disk.
pub fn get_meeting(
    meeting_id: &str,
    root_override: Option<PathBuf>,
    cwd: Option<PathBuf>,
) -> Result<String, String> {
    let root = car_meeting::resolve_meeting_root(root_override.as_deref(), cwd.as_deref());
    let paths = MeetingPaths::for_meeting(root, &meeting_id.to_string());
    let transcript = read_meeting(&paths).map_err(|e| e.to_string())?;
    serde_json::to_string(&transcript).map_err(|e| e.to_string())
}

/// Per-meeting fanout wrapper: ingests transcripts into memgine with a
/// stable `meeting/<id>/<source>` speaker tag, then forwards every
/// event upstream untouched.
///
/// Holds the engine behind `std::sync::Mutex` to match the NAPI/PyO3
/// bindings' sync wrapper. The parsing/formatting logic lives in
/// [`car_meeting::extract_transcript_for_ingest`] so this fanout and the
/// `tokio::sync::Mutex`-flavoured `WsMemgineIngestSink` in
/// `car-server-core` share one source of truth — the two sinks differ
/// only in their lock primitive.
struct MeetingMemgineFanout {
    meeting_id: String,
    engine: Arc<Mutex<car_memgine::MemgineEngine>>,
    upstream: Arc<dyn car_voice::VoiceEventSink>,
}

impl car_voice::VoiceEventSink for MeetingMemgineFanout {
    fn send(&self, voice_session_id: &str, event_json: String) {
        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&event_json) {
            if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
                &value,
                &self.meeting_id,
                voice_session_id,
            ) {
                if let Ok(mut engine) = self.engine.lock() {
                    engine.ingest_conversation(&speaker, &text, chrono::Utc::now());
                }
            }
        }
        self.upstream.send(voice_session_id, event_json);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn rejects_empty_sources() {
        let req = serde_json::json!({"sources": []}).to_string();
        let registry = Arc::new(MeetingRegistry::new());
        let voice = Arc::new(car_voice::VoiceSessionRegistry::new());
        let upstream: Arc<dyn car_voice::VoiceEventSink> = Arc::new(NoopSink);
        let res = futures_block(start_meeting(&req, registry, voice, upstream, None, None));
        assert!(res.is_err());
        assert!(res.unwrap_err().contains("sources"));
    }

    /// Single-threaded tokio runner — avoids pulling tokio_test into deps.
    fn futures_block<F: std::future::Future>(f: F) -> F::Output {
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();
        rt.block_on(f)
    }

    struct NoopSink;
    impl car_voice::VoiceEventSink for NoopSink {
        fn send(&self, _sid: &str, _json: String) {}
    }
}