insightful/
lib.rs

1use parking_lot::RwLock;
2use serde::Deserialize;
3use std::{
4    collections::BTreeMap,
5    error::Error,
6    fmt::{Debug, Display},
7    fs::File,
8    io::{BufRead, BufReader, Write},
9    path::PathBuf,
10    sync::Arc,
11    time::Duration,
12};
13use tantivy::{
14    Document, Index, IndexReader, IndexWriter, TantivyDocument,
15    directory::MmapDirectory,
16    query::QueryParser,
17    schema::{DateOptions, INDEXED, STORED, STRING, Schema, TEXT},
18};
19use tokio::task::JoinHandle;
20use tracing::instrument;
21
22#[derive(Deserialize)]
23struct LogSpan {
24    // name: String,
25
26    // env: Option<String>,
27    domain: Option<String>,
28    // id: Option<String>,
29
30    // i: Option<u8>,
31    // nm: Option<String>,
32
33    // trigger: Option<String>,
34
35    // flavor: Option<String>,
36}
37
38#[derive(Deserialize)]
39struct LogFields {
40    // message: Option<String>,
41
42    // ip: Option<String>,
43    // version: Option<String>,
44    // method: Option<String>,
45    // uri: Option<String>,
46    // headers: Option<String>,
47    // status: Option<u16>,
48
49    // token: Option<String>,
50
51    // ts: Option<String>,
52    // path: Option<String>,
53    // query: Option<String>,
54    // correlation: Option<String>,
55    // flags: Option<String>,
56}
57
58#[derive(Deserialize)]
59struct LogLine {
60    // timestamp: String,
61    // level: String,
62    // fields: LogFields,
63    spans: Option<Vec<LogSpan>>,
64}
65
66#[derive(Deserialize, Clone)]
67#[serde(rename_all = "camelCase")]
68pub enum QueryFormat {
69    All,
70    Count,
71    Top,
72}
73
74impl QueryFormat {
75    fn format(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        match self {
77            &Self::All => write!(f, "all"),
78            &Self::Count => write!(f, "count"),
79            &Self::Top => write!(f, "top"),
80        }
81    }
82}
83
84impl Display for QueryFormat {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        self.format(f)
87    }
88}
89
90impl Debug for QueryFormat {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        self.format(f)
93    }
94}
95
96pub struct Manager {
97    dir: PathBuf,
98
99    // todo: needs more than just static interval,
100    // todo: but this gets *something* for now.
101    index_interval_s: u64,
102    max_ttl_days: u16,
103    max_lines: u32,
104    cleanup_interval_s: u64,
105
106    domain_indexes: Arc<RwLock<BTreeMap<String, (Schema, Index, IndexReader, QueryParser)>>>,
107}
108
109impl Manager {
110    pub fn new(
111        dir: PathBuf,
112
113        index_interval_s: u64,
114        max_ttl_days: u16,
115        max_lines: u32,
116        cleanup_interval_s: u64,
117    ) -> Self {
118        Manager {
119            dir,
120
121            index_interval_s,
122            max_ttl_days,
123            max_lines,
124            cleanup_interval_s,
125
126            domain_indexes: Arc::new(RwLock::new(BTreeMap::new())),
127        }
128    }
129
130    #[instrument(skip(self), err)]
131    pub fn add_domain(&self, domain: String) -> Result<(), Box<dyn Error>> {
132        let mut schema_builder = Schema::builder();
133
134        let ts_opts = DateOptions::from(INDEXED)
135            .set_stored()
136            .set_fast()
137            .set_precision(tantivy::schema::DateTimePrecision::Microseconds);
138
139        // todo: don't store the values, just index them and store
140        // todo: logs in separate file, compressed.
141        //
142        // ?? store file path and log line in the index
143        let timestamp = schema_builder.add_date_field("timestamp", ts_opts);
144        let level = schema_builder.add_text_field("level", STRING | STORED);
145        let fields = schema_builder.add_json_field("fields", TEXT | STORED);
146        let spans = schema_builder.add_json_field("spans", TEXT | STORED);
147
148        let schema = schema_builder.build();
149
150        let index_dir = self.dir.join("search").join(&domain);
151        std::fs::create_dir_all(&index_dir)?;
152
153        let dir = MmapDirectory::open(index_dir)?;
154        let index = Index::open_or_create(dir, schema.clone())?;
155
156        let reader = index
157            .reader_builder()
158            .reload_policy(tantivy::ReloadPolicy::OnCommitWithDelay);
159
160        let query_parser = QueryParser::for_index(&index, vec![timestamp, level, fields, spans]);
161
162        let mut di = self.domain_indexes.write();
163        di.insert(domain, (schema, index, reader.try_into()?, query_parser));
164
165        Ok(())
166    }
167
168    #[instrument(skip(self), err)]
169    pub fn search(
170        &self,
171        domain: String,
172        query: String,
173        format: QueryFormat,
174        limit: Option<usize>,
175    ) -> Result<String, Box<dyn Error>> {
176        let di = self.domain_indexes.read();
177        let mut out = String::new();
178
179        if let Some((schema, _, reader, parser)) = di.get(&domain) {
180            let query = parser.parse_query(&query)?;
181            let searcher = reader.searcher();
182
183            match format {
184                QueryFormat::All => {
185                    let docs = searcher.search(&*query, &tantivy::collector::DocSetCollector)?;
186
187                    let mut lines = vec![];
188                    tracing::info!(count = docs.len());
189
190                    for doc_addr in docs {
191                        let doc: TantivyDocument = searcher.doc(doc_addr)?;
192                        let line = doc.to_json(&schema);
193                        lines.push(line);
194                    }
195
196                    out = format!("[{}]", lines.join(","))
197                }
198                QueryFormat::Count => {
199                    let count = searcher.search(&*query, &tantivy::collector::Count)?;
200
201                    out = count.to_string();
202                }
203                QueryFormat::Top => {
204                    if let Some(limit) = limit {
205                        let docs = searcher
206                            .search(&*query, &tantivy::collector::TopDocs::with_limit(limit))?;
207
208                        let mut lines = vec![];
209                        tracing::info!(count = docs.len());
210
211                        for (_, doc_addr) in docs {
212                            let doc: TantivyDocument = searcher.doc(doc_addr)?;
213                            let line = doc.to_json(&schema);
214                            lines.push(line);
215                        }
216
217                        out = format!("[{}]", lines.join(","))
218                    }
219                }
220            }
221        }
222
223        Ok(out)
224    }
225
226    #[instrument(skip(dir, domain_indexes), err)]
227    fn index(
228        dir: &PathBuf,
229        domain_indexes: &Arc<RwLock<BTreeMap<String, (Schema, Index, IndexReader, QueryParser)>>>,
230    ) -> Result<(), Box<dyn Error>> {
231        tracing::info!("indexing");
232
233        let cursor_path = dir.join("cursor");
234
235        let file_cursor_path = cursor_path.join("file");
236        let file_cursor = match std::fs::read(&file_cursor_path) {
237            Ok(file) => dir.join(std::str::from_utf8(&file)?),
238            Err(_) => dir.join("."),
239        };
240
241        let line_cursor_path = cursor_path.join("line");
242        let line_cursor = match std::fs::read(&line_cursor_path) {
243            Ok(file) => std::str::from_utf8(&file)?.parse::<usize>()?,
244            Err(_) => 0,
245        };
246
247        tracing::info!(file_cursor = %file_cursor.display(), line_cursor);
248
249        let mut log_files = std::fs::read_dir(&dir)?
250            .map(|res| res.map(|e| e.path()))
251            .collect::<Result<Vec<_>, std::io::Error>>()?;
252        log_files.sort();
253
254        let di = domain_indexes.read();
255
256        let mut writer_map = BTreeMap::new();
257
258        for (domain, (schema, index, _, _)) in di.iter() {
259            let writer: IndexWriter = index.writer(15_000_000)?;
260
261            writer_map.insert(domain, (schema, writer));
262        }
263
264        let mut last_log_file = file_cursor.clone();
265        let mut last_log_line = line_cursor;
266
267        for log_file in log_files {
268            if log_file < *file_cursor || log_file.is_dir() {
269                continue;
270            }
271
272            tracing::info!(file = %log_file.display());
273
274            last_log_file = log_file.clone();
275
276            let file = File::open(&log_file)?;
277
278            let reader = BufReader::new(file);
279
280            for (i, line) in reader.lines().enumerate() {
281                if log_file == *file_cursor {
282                    if i < line_cursor {
283                        continue;
284                    }
285                }
286
287                let line = line?;
288                let log = serde_json::from_str::<LogLine>(&line)?;
289
290                let mut log_domain = String::new();
291
292                if let Some(spans) = log.spans {
293                    for span in spans {
294                        if let Some(domain) = span.domain {
295                            log_domain = domain;
296                            break;
297                        }
298                    }
299                }
300
301                if let Some((schema, writer)) = writer_map.get(&log_domain) {
302                    let doc = TantivyDocument::parse_json(&schema, &line)?;
303
304                    writer.add_document(doc)?;
305                }
306
307                last_log_line = i;
308            }
309
310            tracing::info!(lines = last_log_line + 1);
311        }
312
313        tracing::info!(file_cursor = %last_log_file.display(), line_cursor = last_log_line);
314
315        let mut file_cursor = File::create(&file_cursor_path)?;
316        file_cursor.write_all(
317            last_log_file
318                .file_name()
319                .unwrap()
320                .to_str()
321                .unwrap()
322                .as_bytes(),
323        )?;
324
325        let mut line_cursor = File::create(&line_cursor_path)?;
326        line_cursor.write_all(last_log_line.to_string().as_bytes())?;
327
328        for (_, (_, mut writer)) in writer_map {
329            writer.commit()?;
330        }
331
332        Ok(())
333    }
334
335    pub fn start(&self) -> Result<(JoinHandle<()>, JoinHandle<()>), Box<dyn Error>> {
336        let cursor_path = self.dir.join("cursor");
337        std::fs::create_dir_all(&cursor_path)?;
338
339        let index_interval_s = self.index_interval_s;
340        let cleanup_interval_s = self.cleanup_interval_s;
341        let max_lines = self.max_lines;
342        let max_ttl_days = self.max_ttl_days;
343        let dir = self.dir.clone();
344        let domain_indexes = self.domain_indexes.clone();
345
346        // ?? a version of this could use notify to watch for new logs or log files
347        // - https://docs.rs/notify/latest/notify/
348
349        let index_handle = tokio::spawn(async move {
350            loop {
351                tokio::time::sleep(Duration::from_secs(index_interval_s)).await;
352
353                match Self::index(&dir, &domain_indexes) {
354                    Ok(()) => tracing::info!("indexed"),
355                    Err(err) => tracing::error!(%err),
356                };
357            }
358        });
359
360        let cleanup_handle = tokio::spawn(async move {
361            loop {
362                tokio::time::sleep(Duration::from_secs(cleanup_interval_s)).await;
363                tracing::info!(max_lines, max_ttl_days, "todo: cleanup");
364            }
365        });
366
367        Ok((index_handle, cleanup_handle))
368    }
369}