1use std::io::{self, BufRead};
2
3use rusqlite::{Connection, Result};
4
5use crate::{chunk, log};
6
7pub const AUTO_CHUNK_THRESHOLD: usize = 2000;
8pub const DEFAULT_CHUNK_SIZE: usize = 1500;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PipelineStage {
12 Capture,
13 Persist,
14 Structure,
15 Enrich,
16}
17
18impl PipelineStage {
19 pub fn as_str(&self) -> &'static str {
20 match self {
21 PipelineStage::Capture => "capture",
22 PipelineStage::Persist => "persist",
23 PipelineStage::Structure => "structure",
24 PipelineStage::Enrich => "enrich",
25 }
26 }
27}
28
29pub const CANONICAL_PIPELINE: [PipelineStage; 4] = [
30 PipelineStage::Capture,
31 PipelineStage::Persist,
32 PipelineStage::Structure,
33 PipelineStage::Enrich,
34];
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum GovernanceLayer {
38 Law,
39 Principle,
40 Right,
41 Rule,
42 Guideline,
43}
44
45impl GovernanceLayer {
46 pub fn as_str(&self) -> &'static str {
47 match self {
48 GovernanceLayer::Law => "law",
49 GovernanceLayer::Principle => "principle",
50 GovernanceLayer::Right => "right",
51 GovernanceLayer::Rule => "rule",
52 GovernanceLayer::Guideline => "guideline",
53 }
54 }
55}
56
57pub const GOVERNANCE_ORDER: [GovernanceLayer; 5] = [
58 GovernanceLayer::Law,
59 GovernanceLayer::Principle,
60 GovernanceLayer::Right,
61 GovernanceLayer::Rule,
62 GovernanceLayer::Guideline,
63];
64
65pub struct IngestRequest<'a> {
66 pub source: &'a str,
67 pub content: &'a str,
68 pub meta: Option<&'a str>,
69 pub chunk_threshold: usize,
70 pub chunk_size: usize,
71}
72
73impl<'a> IngestRequest<'a> {
74 pub fn new(source: &'a str, content: &'a str, meta: Option<&'a str>) -> Self {
75 Self {
76 source,
77 content,
78 meta,
79 chunk_threshold: AUTO_CHUNK_THRESHOLD,
80 chunk_size: DEFAULT_CHUNK_SIZE,
81 }
82 }
83
84 pub fn with_chunking(mut self, threshold: usize, chunk_size: usize) -> Self {
85 self.chunk_threshold = threshold;
86 self.chunk_size = chunk_size;
87 self
88 }
89
90 fn should_chunk(&self) -> bool {
91 self.chunk_size > 0 && self.content.len() > self.chunk_threshold
92 }
93}
94
95#[derive(Debug, Clone)]
96pub struct IngestResult {
97 pub event_id: String,
98 pub timestamp: i64,
99 pub chunk_count: usize,
100}
101
102#[derive(Debug, Clone, Default)]
103pub struct IngestBatchResult {
104 pub event_ids: Vec<String>,
105 pub total_chunks: usize,
106 pub chunked_events: usize,
107}
108
109pub fn ingest_single(conn: &Connection, request: IngestRequest<'_>) -> Result<IngestResult> {
110 let tx = conn.unchecked_transaction()?;
111 let receipt =
112 log::append_with_receipt_in_tx(&tx, request.source, request.content, request.meta)?;
113
114 let chunk_count = if request.should_chunk() {
115 chunk::create_chunks(
116 &tx,
117 &receipt.id,
118 request.content,
119 receipt.timestamp,
120 request.chunk_size,
121 )?
122 } else {
123 0
124 };
125
126 #[cfg(feature = "embedding")]
128 {
129 eprintln!(
132 "Embedding generation placeholder - would process event: {}",
133 receipt.id
134 );
135 }
136
137 tx.commit()?;
138
139 Ok(IngestResult {
140 event_id: receipt.id,
141 timestamp: receipt.timestamp,
142 chunk_count,
143 })
144}
145
146pub fn ingest_stdin(
147 conn: &Connection,
148 source: &str,
149 meta: Option<&str>,
150 batch_size: usize,
151) -> io::Result<IngestBatchResult> {
152 ingest_stdin_with_policy(
153 conn,
154 source,
155 meta,
156 batch_size,
157 AUTO_CHUNK_THRESHOLD,
158 DEFAULT_CHUNK_SIZE,
159 )
160}
161
162pub fn ingest_stdin_with_policy(
163 conn: &Connection,
164 source: &str,
165 meta: Option<&str>,
166 batch_size: usize,
167 chunk_threshold: usize,
168 chunk_size: usize,
169) -> io::Result<IngestBatchResult> {
170 let stdin = io::stdin();
171 let reader = stdin.lock();
172 ingest_reader(
173 conn,
174 source,
175 meta,
176 reader,
177 batch_size,
178 chunk_threshold,
179 chunk_size,
180 )
181}
182
183pub fn ingest_reader<R: BufRead>(
184 conn: &Connection,
185 source: &str,
186 meta: Option<&str>,
187 reader: R,
188 batch_size: usize,
189 chunk_threshold: usize,
190 chunk_size: usize,
191) -> io::Result<IngestBatchResult> {
192 let effective_batch_size = batch_size.max(1);
193 let mut result = IngestBatchResult::default();
194 let mut batch: Vec<String> = Vec::new();
195
196 for line in reader.lines() {
197 let line = line?;
198 let trimmed = line.trim();
199 if trimmed.is_empty() {
200 continue;
201 }
202 batch.push(trimmed.to_string());
203
204 if batch.len() >= effective_batch_size {
205 flush_batch(
206 conn,
207 source,
208 meta,
209 &mut batch,
210 chunk_threshold,
211 chunk_size,
212 &mut result,
213 )?;
214 }
215 }
216
217 if !batch.is_empty() {
218 flush_batch(
219 conn,
220 source,
221 meta,
222 &mut batch,
223 chunk_threshold,
224 chunk_size,
225 &mut result,
226 )?;
227 }
228
229 Ok(result)
230}
231
232fn flush_batch(
233 conn: &Connection,
234 source: &str,
235 meta: Option<&str>,
236 batch: &mut Vec<String>,
237 chunk_threshold: usize,
238 chunk_size: usize,
239 result: &mut IngestBatchResult,
240) -> io::Result<()> {
241 let tx = conn.unchecked_transaction().map_err(io::Error::other)?;
242 let content_refs: Vec<&str> = batch.iter().map(String::as_str).collect();
243 let receipts = log::append_batch_with_receipts_in_tx(&tx, source, &content_refs, meta)
244 .map_err(io::Error::other)?;
245
246 for (content, receipt) in batch.iter().zip(receipts.into_iter()) {
247 let event_id = receipt.id;
248 result.event_ids.push(event_id.clone());
249
250 if chunk_size > 0 && content.len() > chunk_threshold {
251 let chunk_count =
252 chunk::create_chunks(&tx, &event_id, content, receipt.timestamp, chunk_size)
253 .map_err(io::Error::other)?;
254 if chunk_count > 0 {
255 result.chunked_events += 1;
256 result.total_chunks += chunk_count;
257 }
258 }
259 }
260
261 tx.commit().map_err(io::Error::other)?;
262 batch.clear();
263 Ok(())
264}