Skip to main content

converge_knowledge/ingest/
rich_media.rs

1//! Shared rich-media ingestion and transcription contracts.
2//!
3//! These abstractions support Phase 3 work (audio processing, video
4//! transcription) without coupling the rest of the ingest pipeline to a
5//! specific transcription backend or runtime.
6
7use crate::Result;
8use crate::error::Error;
9use crate::ingest::SourceProvenance;
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::path::PathBuf;
15
16/// The media type being ingested.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum MediaKind {
20    /// Audio-only media.
21    Audio,
22    /// Video media (may also include audio track).
23    Video,
24}
25
26/// Transcription backend family.
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(rename_all = "snake_case")]
29pub enum TranscriptionEngine {
30    /// OpenAI Whisper or compatible local implementation.
31    Whisper,
32    /// Test/mock backend.
33    Mock,
34    /// Other external backend.
35    External,
36}
37
38/// Request for media ingestion or preprocessing.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct MediaIngestRequest {
41    /// Path to the media file.
42    pub path: PathBuf,
43    /// Whether this is audio or video.
44    pub media_kind: MediaKind,
45    /// Shared source provenance metadata.
46    pub provenance: SourceProvenance,
47    /// Optional language hints.
48    pub language_hints: Vec<String>,
49    /// Source-specific metadata (codec, channels, sample-rate, etc.).
50    pub metadata: HashMap<String, String>,
51}
52
53impl MediaIngestRequest {
54    /// Create a new media ingest request.
55    pub fn new(
56        path: impl Into<PathBuf>,
57        media_kind: MediaKind,
58        provenance: SourceProvenance,
59    ) -> Self {
60        Self {
61            path: path.into(),
62            media_kind,
63            provenance,
64            language_hints: Vec::new(),
65            metadata: HashMap::new(),
66        }
67    }
68
69    /// Add a language hint.
70    pub fn with_language_hint(mut self, language: impl Into<String>) -> Self {
71        self.language_hints.push(language.into());
72        self
73    }
74}
75
76/// Chunking policy used when segmenting transcript text for indexing.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
78pub struct TranscriptChunkPolicy {
79    /// Maximum transcript segment duration in milliseconds.
80    pub max_segment_duration_ms: u64,
81    /// Maximum characters per chunk.
82    pub max_chars_per_chunk: usize,
83}
84
85impl Default for TranscriptChunkPolicy {
86    fn default() -> Self {
87        Self {
88            max_segment_duration_ms: 60_000,
89            max_chars_per_chunk: 2_000,
90        }
91    }
92}
93
94/// Request passed to transcription backends.
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct TranscriptionRequest {
97    /// Base media request.
98    pub media: MediaIngestRequest,
99    /// Transcript chunking hint for downstream ingestion.
100    pub chunk_policy: TranscriptChunkPolicy,
101    /// Whether speaker labels are desired (if backend supports them).
102    pub diarization: bool,
103}
104
105impl TranscriptionRequest {
106    /// Create a transcription request from a media request.
107    pub fn new(media: MediaIngestRequest) -> Self {
108        Self {
109            media,
110            chunk_policy: TranscriptChunkPolicy::default(),
111            diarization: false,
112        }
113    }
114
115    /// Enable speaker diarization if supported.
116    pub fn with_diarization(mut self) -> Self {
117        self.diarization = true;
118        self
119    }
120}
121
122/// Timestamped transcript segment.
123#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
124pub struct TranscriptSegment {
125    /// Segment index in transcript order.
126    pub index: usize,
127    /// Start timestamp in milliseconds from media start.
128    pub start_ms: u64,
129    /// End timestamp in milliseconds from media start.
130    pub end_ms: u64,
131    /// Transcript text for this segment.
132    pub text: String,
133    /// Optional speaker identifier.
134    pub speaker: Option<String>,
135    /// Confidence score when provided by backend.
136    pub confidence: Option<f32>,
137}
138
139impl TranscriptSegment {
140    /// Create a segment.
141    pub fn new(index: usize, start_ms: u64, end_ms: u64, text: impl Into<String>) -> Self {
142        Self {
143            index,
144            start_ms,
145            end_ms,
146            text: text.into(),
147            speaker: None,
148            confidence: None,
149        }
150    }
151
152    /// Segment duration in milliseconds.
153    pub fn duration_ms(&self) -> u64 {
154        self.end_ms.saturating_sub(self.start_ms)
155    }
156
157    /// Returns `true` if timestamps are a valid non-empty span.
158    pub fn has_valid_time_range(&self) -> bool {
159        self.end_ms > self.start_ms
160    }
161}
162
163/// A full transcript result for audio or video media.
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct TranscriptDocument {
166    /// Backend engine used to produce the transcript.
167    pub engine: TranscriptionEngine,
168    /// Media type for this transcript.
169    pub media_kind: MediaKind,
170    /// Optional detected language code.
171    pub language: Option<String>,
172    /// Transcript segments in chronological order.
173    pub segments: Vec<TranscriptSegment>,
174    /// Optional backend-provided full transcript text.
175    pub full_text: String,
176    /// When the transcript was produced.
177    pub generated_at: DateTime<Utc>,
178    /// Shared source provenance metadata.
179    pub provenance: SourceProvenance,
180}
181
182impl TranscriptDocument {
183    /// Create an empty transcript document.
184    pub fn new(
185        engine: TranscriptionEngine,
186        media_kind: MediaKind,
187        provenance: SourceProvenance,
188    ) -> Self {
189        Self {
190            engine,
191            media_kind,
192            language: None,
193            segments: Vec::new(),
194            full_text: String::new(),
195            generated_at: Utc::now(),
196            provenance,
197        }
198    }
199
200    /// Returns the best available transcript text for indexing.
201    pub fn effective_text(&self) -> String {
202        let trimmed = self.full_text.trim();
203        if !trimmed.is_empty() {
204            return trimmed.to_string();
205        }
206
207        self.segments
208            .iter()
209            .map(|segment| segment.text.trim())
210            .filter(|text| !text.is_empty())
211            .collect::<Vec<_>>()
212            .join("\n")
213    }
214
215    /// Validate transcript segment ordering and timestamp ranges.
216    pub fn validate(&self) -> Result<()> {
217        let mut last_end = 0u64;
218
219        for (position, segment) in self.segments.iter().enumerate() {
220            if !segment.has_valid_time_range() {
221                return Err(Error::ingest(format!(
222                    "invalid transcript segment at position {position}: end_ms ({}) must be greater than start_ms ({})",
223                    segment.end_ms, segment.start_ms
224                )));
225            }
226
227            if position > 0 && segment.start_ms < last_end {
228                return Err(Error::ingest(format!(
229                    "overlapping transcript segment at position {position}: start_ms ({}) < previous end_ms ({last_end})",
230                    segment.start_ms
231                )));
232            }
233
234            last_end = segment.end_ms;
235        }
236
237        Ok(())
238    }
239}
240
241/// Transcription backend contract used by audio/video ingestion pipelines.
242#[async_trait]
243pub trait TranscriptionBackend: Send + Sync {
244    /// Backend engine identifier.
245    fn engine(&self) -> TranscriptionEngine;
246
247    /// Produce a transcript for the requested media.
248    async fn transcribe(&self, request: &TranscriptionRequest) -> Result<TranscriptDocument>;
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use crate::ingest::{SourceKind, SourceProvenance};
255
256    #[test]
257    fn transcript_effective_text_falls_back_to_segments() {
258        let provenance = SourceProvenance::new(SourceKind::Audio, "file:///meeting.m4a");
259        let mut doc =
260            TranscriptDocument::new(TranscriptionEngine::Mock, MediaKind::Audio, provenance);
261        doc.segments
262            .push(TranscriptSegment::new(0, 0, 1000, "hello world"));
263        doc.segments
264            .push(TranscriptSegment::new(1, 1000, 2000, "second segment"));
265
266        assert_eq!(doc.effective_text(), "hello world\nsecond segment");
267    }
268
269    #[test]
270    fn transcript_validation_rejects_overlap() {
271        let provenance = SourceProvenance::new(SourceKind::Video, "file:///clip.mp4");
272        let mut doc =
273            TranscriptDocument::new(TranscriptionEngine::Mock, MediaKind::Video, provenance);
274        doc.segments.push(TranscriptSegment::new(0, 0, 1500, "a"));
275        doc.segments
276            .push(TranscriptSegment::new(1, 1000, 2000, "b"));
277
278        let err = doc.validate().unwrap_err();
279        assert!(err.to_string().contains("overlapping transcript segment"));
280    }
281
282    #[test]
283    fn transcript_validation_accepts_monotonic_segments() {
284        let provenance = SourceProvenance::new(SourceKind::Video, "file:///clip.mp4");
285        let mut doc =
286            TranscriptDocument::new(TranscriptionEngine::Mock, MediaKind::Video, provenance);
287        doc.segments.push(TranscriptSegment::new(0, 0, 1500, "a"));
288        doc.segments
289            .push(TranscriptSegment::new(1, 1500, 2200, "b"));
290
291        assert!(doc.validate().is_ok());
292    }
293}