1use rusqlite::{Connection, params};
2use serde::Serialize;
3
4use crate::errors::AppError;
5
6use super::repository::QueryState;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum SearchField {
10 Raw,
11 Derived,
12 Tags,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SearchMatchMode {
17 Fts,
18 Prefix,
19 Contains,
20}
21
22impl SearchField {
23 fn fts_column(self) -> &'static str {
24 match self {
25 Self::Raw => "raw_text",
26 Self::Derived => "derived_text",
27 Self::Tags => "tags_text",
28 }
29 }
30
31 pub fn label(self) -> &'static str {
32 self.fts_column()
33 }
34}
35
36#[derive(Debug, Clone, Serialize)]
37pub struct SearchItem {
38 pub item_id: i64,
39 pub created_at: String,
40 pub score: f64,
41 pub matched_fields: Vec<String>,
42 pub preview: String,
43 pub content_type: Option<String>,
44 pub validation_status: Option<String>,
45}
46
47#[derive(Debug, Clone, Copy)]
48pub enum ReportPeriod {
49 Week,
50 Month,
51}
52
53#[derive(Debug, Clone)]
54pub struct ReportRangeQuery {
55 pub period: String,
56 pub from: String,
57 pub to: String,
58 pub timezone: String,
59}
60
61#[derive(Debug, Clone, Serialize)]
62pub struct NameCount {
63 pub name: String,
64 pub count: i64,
65}
66
67#[derive(Debug, Clone, Serialize)]
68pub struct ReportRange {
69 pub from: String,
70 pub to: String,
71 pub timezone: String,
72}
73
74#[derive(Debug, Clone, Serialize)]
75pub struct ReportTotals {
76 pub captured: i64,
77 pub enriched: i64,
78 pub pending: i64,
79}
80
81#[derive(Debug, Clone, Serialize)]
82pub struct ReportSummary {
83 pub period: String,
84 pub range: ReportRange,
85 pub totals: ReportTotals,
86 pub top_categories: Vec<NameCount>,
87 pub top_tags: Vec<NameCount>,
88 pub top_content_types: Vec<NameCount>,
89 pub validation_status_totals: Vec<NameCount>,
90}
91
92pub fn search_items(
93 conn: &Connection,
94 query: &str,
95 state: QueryState,
96 fields: &[SearchField],
97 match_mode: SearchMatchMode,
98 limit: usize,
99) -> Result<Vec<SearchItem>, AppError> {
100 let fields = normalize_search_fields(fields);
101 let state_filter = state_filter_sql(state);
102 let matched_fields = fields
103 .iter()
104 .map(|field| field.label().to_string())
105 .collect::<Vec<_>>();
106
107 match match_mode {
108 SearchMatchMode::Fts => {
109 search_items_fts(conn, query, &fields, state_filter, &matched_fields, limit)
110 }
111 SearchMatchMode::Prefix => {
112 let prefix_query = build_prefix_query(query);
113 search_items_fts(
114 conn,
115 &prefix_query,
116 &fields,
117 state_filter,
118 &matched_fields,
119 limit,
120 )
121 }
122 SearchMatchMode::Contains => {
123 search_items_contains(conn, query, &fields, state_filter, &matched_fields, limit)
124 }
125 }
126}
127
128fn normalize_search_fields(fields: &[SearchField]) -> Vec<SearchField> {
129 let mut out = Vec::new();
130 let source = if fields.is_empty() {
131 &[SearchField::Raw, SearchField::Derived, SearchField::Tags][..]
132 } else {
133 fields
134 };
135
136 for field in source {
137 if !out.contains(field) {
138 out.push(*field);
139 }
140 }
141
142 out
143}
144
145fn build_scoped_query(query: &str, fields: &[SearchField]) -> String {
146 let columns = fields
147 .iter()
148 .map(|field| field.fts_column())
149 .collect::<Vec<_>>()
150 .join(" ");
151 format!("{{{columns}}}: ({query})")
152}
153
154fn state_filter_sql(state: QueryState) -> &'static str {
155 match state {
156 QueryState::All => "1 = 1",
157 QueryState::Pending => {
158 "not exists (
159 select 1 from item_derivations d
160 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
161 )"
162 }
163 QueryState::Enriched => {
164 "exists (
165 select 1 from item_derivations d
166 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
167 )"
168 }
169 }
170}
171
172fn search_items_fts(
173 conn: &Connection,
174 query: &str,
175 fields: &[SearchField],
176 state_filter: &str,
177 matched_fields: &[String],
178 limit: usize,
179) -> Result<Vec<SearchItem>, AppError> {
180 let scoped_query = build_scoped_query(query, fields);
181 let sql = format!(
182 "select
183 i.item_id,
184 i.created_at,
185 bm25(item_search_fts) as score,
186 substr(coalesce(doc.derived_text, i.raw_text), 1, 120) as preview,
187 json_extract(ad.payload_json, '$.content_type') as content_type,
188 json_extract(ad.payload_json, '$.validation_status') as validation_status
189 from item_search_fts
190 join item_search_documents doc on doc.item_id = item_search_fts.rowid
191 join inbox_items i on i.item_id = doc.item_id
192 left join item_derivations ad
193 on ad.derivation_id = (
194 select d.derivation_id
195 from item_derivations d
196 where d.item_id = i.item_id
197 and d.is_active = 1
198 and d.status = 'accepted'
199 order by d.derivation_version desc, d.derivation_id desc
200 limit 1
201 )
202 where item_search_fts match ?1
203 and {state_filter}
204 order by score asc, i.created_at desc, i.item_id desc
205 limit ?2"
206 );
207 let mut stmt = conn.prepare(&sql).map_err(AppError::db_query)?;
208 let rows = stmt
209 .query_map(params![scoped_query, limit as i64], |row| {
210 Ok(SearchItem {
211 item_id: row.get(0)?,
212 created_at: row.get(1)?,
213 score: row.get(2)?,
214 matched_fields: matched_fields.to_vec(),
215 preview: row.get(3)?,
216 content_type: row.get(4)?,
217 validation_status: row.get(5)?,
218 })
219 })
220 .map_err(AppError::db_query)?;
221
222 rows.collect::<Result<Vec<_>, _>>()
223 .map_err(AppError::db_query)
224}
225
226fn search_items_contains(
227 conn: &Connection,
228 query: &str,
229 fields: &[SearchField],
230 state_filter: &str,
231 matched_fields: &[String],
232 limit: usize,
233) -> Result<Vec<SearchItem>, AppError> {
234 let contains_filter = build_contains_filter(fields);
235 let sql = format!(
236 "select
237 i.item_id,
238 i.created_at,
239 0.0 as score,
240 substr(coalesce(doc.derived_text, i.raw_text), 1, 120) as preview,
241 json_extract(ad.payload_json, '$.content_type') as content_type,
242 json_extract(ad.payload_json, '$.validation_status') as validation_status
243 from item_search_documents doc
244 join inbox_items i on i.item_id = doc.item_id
245 left join item_derivations ad
246 on ad.derivation_id = (
247 select d.derivation_id
248 from item_derivations d
249 where d.item_id = i.item_id
250 and d.is_active = 1
251 and d.status = 'accepted'
252 order by d.derivation_version desc, d.derivation_id desc
253 limit 1
254 )
255 where ({contains_filter})
256 and {state_filter}
257 order by i.created_at desc, i.item_id desc
258 limit ?2"
259 );
260 let mut stmt = conn.prepare(&sql).map_err(AppError::db_query)?;
261 let rows = stmt
262 .query_map(params![query, limit as i64], |row| {
263 Ok(SearchItem {
264 item_id: row.get(0)?,
265 created_at: row.get(1)?,
266 score: row.get(2)?,
267 matched_fields: matched_fields.to_vec(),
268 preview: row.get(3)?,
269 content_type: row.get(4)?,
270 validation_status: row.get(5)?,
271 })
272 })
273 .map_err(AppError::db_query)?;
274
275 rows.collect::<Result<Vec<_>, _>>()
276 .map_err(AppError::db_query)
277}
278
279fn build_contains_filter(fields: &[SearchField]) -> String {
280 fields
281 .iter()
282 .map(|field| {
283 format!(
284 "instr(lower(coalesce(doc.{column}, '')), lower(?1)) > 0",
285 column = field.fts_column()
286 )
287 })
288 .collect::<Vec<_>>()
289 .join(" or ")
290}
291
292fn build_prefix_query(query: &str) -> String {
293 let tokens = query
294 .split_whitespace()
295 .map(str::trim)
296 .filter(|token| !token.is_empty())
297 .map(prefix_token)
298 .collect::<Vec<_>>();
299
300 if tokens.is_empty() {
301 query.to_string()
302 } else {
303 tokens.join(" ")
304 }
305}
306
307fn prefix_token(token: &str) -> String {
308 let token = token.trim_end_matches('*');
309 let escaped = token.replace('"', "\"\"");
310 format!("\"{escaped}\"*")
311}
312
313pub fn report_summary(conn: &Connection, period: ReportPeriod) -> Result<ReportSummary, AppError> {
314 let (period_name, from_sql, to_sql) = match period {
315 ReportPeriod::Week => (
316 "week",
317 "strftime('%Y-%m-%dT%H:%M:%fZ', 'now', '-7 days')",
318 "strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
319 ),
320 ReportPeriod::Month => (
321 "month",
322 "strftime('%Y-%m-%dT%H:%M:%fZ', 'now', 'start of month')",
323 "strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
324 ),
325 };
326
327 let (from, to): (String, String) = conn
328 .query_row(&format!("select {from_sql}, {to_sql}"), [], |row| {
329 Ok((row.get(0)?, row.get(1)?))
330 })
331 .map_err(AppError::db_query)?;
332
333 let query = ReportRangeQuery {
334 period: period_name.to_string(),
335 from,
336 to,
337 timezone: "UTC".to_string(),
338 };
339 report_summary_with_range(conn, &query)
340}
341
342pub fn report_summary_with_range(
343 conn: &Connection,
344 query: &ReportRangeQuery,
345) -> Result<ReportSummary, AppError> {
346 let from = &query.from;
347 let to = &query.to;
348
349 let captured: i64 = conn
350 .query_row(
351 "select count(*)
352 from inbox_items
353 where julianday(created_at) >= julianday(?1)
354 and julianday(created_at) <= julianday(?2)",
355 params![from, to],
356 |row| row.get(0),
357 )
358 .map_err(AppError::db_query)?;
359
360 let enriched: i64 = conn
361 .query_row(
362 "select count(distinct i.item_id)
363 from inbox_items i
364 join item_derivations d on d.item_id = i.item_id
365 where d.is_active = 1
366 and d.status = 'accepted'
367 and julianday(i.created_at) >= julianday(?1)
368 and julianday(i.created_at) <= julianday(?2)",
369 params![from, to],
370 |row| row.get(0),
371 )
372 .map_err(AppError::db_query)?;
373
374 let pending = (captured - enriched).max(0);
375 let top_categories = collect_top_categories(conn, from, to)?;
376 let top_tags = collect_top_tags(conn, from, to)?;
377 let top_content_types = collect_top_content_types(conn, from, to)?;
378 let validation_status_totals = collect_validation_status_totals(conn, from, to)?;
379
380 Ok(ReportSummary {
381 period: query.period.clone(),
382 range: ReportRange {
383 from: from.clone(),
384 to: to.clone(),
385 timezone: query.timezone.clone(),
386 },
387 totals: ReportTotals {
388 captured,
389 enriched,
390 pending,
391 },
392 top_categories,
393 top_tags,
394 top_content_types,
395 validation_status_totals,
396 })
397}
398
399fn collect_top_categories(
400 conn: &Connection,
401 from: &str,
402 to: &str,
403) -> Result<Vec<NameCount>, AppError> {
404 let mut stmt = conn
405 .prepare(
406 "select coalesce(nullif(trim(d.category), ''), 'uncategorized') as category_name,
407 count(*) as category_count
408 from item_derivations d
409 join inbox_items i on i.item_id = d.item_id
410 where d.is_active = 1
411 and d.status = 'accepted'
412 and julianday(i.created_at) >= julianday(?1)
413 and julianday(i.created_at) <= julianday(?2)
414 group by category_name
415 order by category_count desc, category_name asc
416 limit 5",
417 )
418 .map_err(AppError::db_query)?;
419
420 let rows = stmt
421 .query_map(params![from, to], |row| {
422 Ok(NameCount {
423 name: row.get(0)?,
424 count: row.get(1)?,
425 })
426 })
427 .map_err(AppError::db_query)?;
428
429 rows.collect::<Result<Vec<_>, _>>()
430 .map_err(AppError::db_query)
431}
432
433fn collect_top_tags(conn: &Connection, from: &str, to: &str) -> Result<Vec<NameCount>, AppError> {
434 let mut stmt = conn
435 .prepare(
436 "select t.tag_name, count(*) as tag_count
437 from item_tags it
438 join tags t on t.tag_id = it.tag_id
439 join item_derivations d on d.derivation_id = it.derivation_id
440 join inbox_items i on i.item_id = d.item_id
441 where d.is_active = 1
442 and d.status = 'accepted'
443 and julianday(i.created_at) >= julianday(?1)
444 and julianday(i.created_at) <= julianday(?2)
445 group by t.tag_name
446 order by tag_count desc, t.tag_name asc
447 limit 5",
448 )
449 .map_err(AppError::db_query)?;
450
451 let rows = stmt
452 .query_map(params![from, to], |row| {
453 Ok(NameCount {
454 name: row.get(0)?,
455 count: row.get(1)?,
456 })
457 })
458 .map_err(AppError::db_query)?;
459
460 rows.collect::<Result<Vec<_>, _>>()
461 .map_err(AppError::db_query)
462}
463
464fn collect_top_content_types(
465 conn: &Connection,
466 from: &str,
467 to: &str,
468) -> Result<Vec<NameCount>, AppError> {
469 let mut stmt = conn
470 .prepare(
471 "select coalesce(nullif(trim(json_extract(d.payload_json, '$.content_type')), ''), 'unknown') as content_type,
472 count(*) as content_type_count
473 from item_derivations d
474 join inbox_items i on i.item_id = d.item_id
475 where d.is_active = 1
476 and d.status = 'accepted'
477 and julianday(i.created_at) >= julianday(?1)
478 and julianday(i.created_at) <= julianday(?2)
479 group by content_type
480 order by content_type_count desc, content_type asc
481 limit 7",
482 )
483 .map_err(AppError::db_query)?;
484
485 let rows = stmt
486 .query_map(params![from, to], |row| {
487 Ok(NameCount {
488 name: row.get(0)?,
489 count: row.get(1)?,
490 })
491 })
492 .map_err(AppError::db_query)?;
493
494 rows.collect::<Result<Vec<_>, _>>()
495 .map_err(AppError::db_query)
496}
497
498fn collect_validation_status_totals(
499 conn: &Connection,
500 from: &str,
501 to: &str,
502) -> Result<Vec<NameCount>, AppError> {
503 let mut stmt = conn
504 .prepare(
505 "select coalesce(nullif(trim(json_extract(d.payload_json, '$.validation_status')), ''), 'unknown') as validation_status,
506 count(*) as validation_status_count
507 from item_derivations d
508 join inbox_items i on i.item_id = d.item_id
509 where d.is_active = 1
510 and d.status = 'accepted'
511 and julianday(i.created_at) >= julianday(?1)
512 and julianday(i.created_at) <= julianday(?2)
513 group by validation_status
514 order by validation_status_count desc, validation_status asc",
515 )
516 .map_err(AppError::db_query)?;
517
518 let rows = stmt
519 .query_map(params![from, to], |row| {
520 Ok(NameCount {
521 name: row.get(0)?,
522 count: row.get(1)?,
523 })
524 })
525 .map_err(AppError::db_query)?;
526
527 rows.collect::<Result<Vec<_>, _>>()
528 .map_err(AppError::db_query)
529}