Skip to main content

mirror_log/
pipeline.rs

1use std::io::{self, BufRead};
2
3use rusqlite::{Connection, Result};
4
5use crate::{chunk, log};
6
7pub const AUTO_CHUNK_THRESHOLD: usize = 2000;
8pub const DEFAULT_CHUNK_SIZE: usize = 1500;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PipelineStage {
12    Capture,
13    Persist,
14    Structure,
15    Enrich,
16}
17
18impl PipelineStage {
19    pub fn as_str(&self) -> &'static str {
20        match self {
21            PipelineStage::Capture => "capture",
22            PipelineStage::Persist => "persist",
23            PipelineStage::Structure => "structure",
24            PipelineStage::Enrich => "enrich",
25        }
26    }
27}
28
29pub const CANONICAL_PIPELINE: [PipelineStage; 4] = [
30    PipelineStage::Capture,
31    PipelineStage::Persist,
32    PipelineStage::Structure,
33    PipelineStage::Enrich,
34];
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum GovernanceLayer {
38    Law,
39    Principle,
40    Right,
41    Rule,
42    Guideline,
43}
44
45impl GovernanceLayer {
46    pub fn as_str(&self) -> &'static str {
47        match self {
48            GovernanceLayer::Law => "law",
49            GovernanceLayer::Principle => "principle",
50            GovernanceLayer::Right => "right",
51            GovernanceLayer::Rule => "rule",
52            GovernanceLayer::Guideline => "guideline",
53        }
54    }
55}
56
57pub const GOVERNANCE_ORDER: [GovernanceLayer; 5] = [
58    GovernanceLayer::Law,
59    GovernanceLayer::Principle,
60    GovernanceLayer::Right,
61    GovernanceLayer::Rule,
62    GovernanceLayer::Guideline,
63];
64
65pub struct IngestRequest<'a> {
66    pub source: &'a str,
67    pub content: &'a str,
68    pub meta: Option<&'a str>,
69    pub chunk_threshold: usize,
70    pub chunk_size: usize,
71}
72
73impl<'a> IngestRequest<'a> {
74    pub fn new(source: &'a str, content: &'a str, meta: Option<&'a str>) -> Self {
75        Self {
76            source,
77            content,
78            meta,
79            chunk_threshold: AUTO_CHUNK_THRESHOLD,
80            chunk_size: DEFAULT_CHUNK_SIZE,
81        }
82    }
83
84    pub fn with_chunking(mut self, threshold: usize, chunk_size: usize) -> Self {
85        self.chunk_threshold = threshold;
86        self.chunk_size = chunk_size;
87        self
88    }
89
90    fn should_chunk(&self) -> bool {
91        self.chunk_size > 0 && self.content.len() > self.chunk_threshold
92    }
93}
94
95#[derive(Debug, Clone)]
96pub struct IngestResult {
97    pub event_id: String,
98    pub timestamp: i64,
99    pub chunk_count: usize,
100}
101
102#[derive(Debug, Clone, Default)]
103pub struct IngestBatchResult {
104    pub event_ids: Vec<String>,
105    pub total_chunks: usize,
106    pub chunked_events: usize,
107}
108
109pub fn ingest_single(conn: &Connection, request: IngestRequest<'_>) -> Result<IngestResult> {
110    let tx = conn.unchecked_transaction()?;
111    let receipt =
112        log::append_with_receipt_in_tx(&tx, request.source, request.content, request.meta)?;
113
114    let chunk_count = if request.should_chunk() {
115        chunk::create_chunks(
116            &tx,
117            &receipt.id,
118            request.content,
119            receipt.timestamp,
120            request.chunk_size,
121        )?
122    } else {
123        0
124    };
125
126    // Enrichment: Placeholder for embedding generation
127    #[cfg(feature = "embedding")]
128    {
129        // Embedding generation would occur here in a full implementation
130        // For now, we just log that it's happening at the enrich stage
131        eprintln!(
132            "Embedding generation placeholder - would process event: {}",
133            receipt.id
134        );
135    }
136
137    tx.commit()?;
138
139    Ok(IngestResult {
140        event_id: receipt.id,
141        timestamp: receipt.timestamp,
142        chunk_count,
143    })
144}
145
146pub fn ingest_stdin(
147    conn: &Connection,
148    source: &str,
149    meta: Option<&str>,
150    batch_size: usize,
151) -> io::Result<IngestBatchResult> {
152    ingest_stdin_with_policy(
153        conn,
154        source,
155        meta,
156        batch_size,
157        AUTO_CHUNK_THRESHOLD,
158        DEFAULT_CHUNK_SIZE,
159    )
160}
161
162pub fn ingest_stdin_with_policy(
163    conn: &Connection,
164    source: &str,
165    meta: Option<&str>,
166    batch_size: usize,
167    chunk_threshold: usize,
168    chunk_size: usize,
169) -> io::Result<IngestBatchResult> {
170    let stdin = io::stdin();
171    let reader = stdin.lock();
172    ingest_reader(
173        conn,
174        source,
175        meta,
176        reader,
177        batch_size,
178        chunk_threshold,
179        chunk_size,
180    )
181}
182
183pub fn ingest_reader<R: BufRead>(
184    conn: &Connection,
185    source: &str,
186    meta: Option<&str>,
187    reader: R,
188    batch_size: usize,
189    chunk_threshold: usize,
190    chunk_size: usize,
191) -> io::Result<IngestBatchResult> {
192    let effective_batch_size = batch_size.max(1);
193    let mut result = IngestBatchResult::default();
194    let mut batch: Vec<String> = Vec::new();
195
196    for line in reader.lines() {
197        let line = line?;
198        let trimmed = line.trim();
199        if trimmed.is_empty() {
200            continue;
201        }
202        batch.push(trimmed.to_string());
203
204        if batch.len() >= effective_batch_size {
205            flush_batch(
206                conn,
207                source,
208                meta,
209                &mut batch,
210                chunk_threshold,
211                chunk_size,
212                &mut result,
213            )?;
214        }
215    }
216
217    if !batch.is_empty() {
218        flush_batch(
219            conn,
220            source,
221            meta,
222            &mut batch,
223            chunk_threshold,
224            chunk_size,
225            &mut result,
226        )?;
227    }
228
229    Ok(result)
230}
231
232fn flush_batch(
233    conn: &Connection,
234    source: &str,
235    meta: Option<&str>,
236    batch: &mut Vec<String>,
237    chunk_threshold: usize,
238    chunk_size: usize,
239    result: &mut IngestBatchResult,
240) -> io::Result<()> {
241    let tx = conn.unchecked_transaction().map_err(io::Error::other)?;
242    let content_refs: Vec<&str> = batch.iter().map(String::as_str).collect();
243    let receipts = log::append_batch_with_receipts_in_tx(&tx, source, &content_refs, meta)
244        .map_err(io::Error::other)?;
245
246    for (content, receipt) in batch.iter().zip(receipts.into_iter()) {
247        let event_id = receipt.id;
248        result.event_ids.push(event_id.clone());
249
250        if chunk_size > 0 && content.len() > chunk_threshold {
251            let chunk_count =
252                chunk::create_chunks(&tx, &event_id, content, receipt.timestamp, chunk_size)
253                    .map_err(io::Error::other)?;
254            if chunk_count > 0 {
255                result.chunked_events += 1;
256                result.total_chunks += chunk_count;
257            }
258        }
259    }
260
261    tx.commit().map_err(io::Error::other)?;
262    batch.clear();
263    Ok(())
264}