1use parking_lot::RwLock;
2use serde::Deserialize;
3use std::{
4 collections::BTreeMap,
5 error::Error,
6 fmt::{Debug, Display},
7 fs::File,
8 io::{BufRead, BufReader, Write},
9 path::PathBuf,
10 sync::Arc,
11 time::Duration,
12};
13use tantivy::{
14 Document, Index, IndexReader, IndexWriter, TantivyDocument,
15 directory::MmapDirectory,
16 query::QueryParser,
17 schema::{DateOptions, INDEXED, STORED, STRING, Schema, TEXT},
18};
19use tokio::task::JoinHandle;
20use tracing::instrument;
21
22#[derive(Deserialize)]
23struct LogSpan {
24 domain: Option<String>,
28 }
37
38#[derive(Deserialize)]
39struct LogFields {
40 }
57
58#[derive(Deserialize)]
59struct LogLine {
60 spans: Option<Vec<LogSpan>>,
64}
65
66#[derive(Deserialize, Clone)]
67#[serde(rename_all = "camelCase")]
68pub enum QueryFormat {
69 All,
70 Count,
71 Top,
72}
73
74impl QueryFormat {
75 fn format(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 &Self::All => write!(f, "all"),
78 &Self::Count => write!(f, "count"),
79 &Self::Top => write!(f, "top"),
80 }
81 }
82}
83
84impl Display for QueryFormat {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 self.format(f)
87 }
88}
89
90impl Debug for QueryFormat {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 self.format(f)
93 }
94}
95
96pub struct Manager {
97 dir: PathBuf,
98
99 index_interval_s: u64,
102 max_ttl_days: u16,
103 max_lines: u32,
104 cleanup_interval_s: u64,
105
106 domain_indexes: Arc<RwLock<BTreeMap<String, (Schema, Index, IndexReader, QueryParser)>>>,
107}
108
109impl Manager {
110 pub fn new(
111 dir: PathBuf,
112
113 index_interval_s: u64,
114 max_ttl_days: u16,
115 max_lines: u32,
116 cleanup_interval_s: u64,
117 ) -> Self {
118 Manager {
119 dir,
120
121 index_interval_s,
122 max_ttl_days,
123 max_lines,
124 cleanup_interval_s,
125
126 domain_indexes: Arc::new(RwLock::new(BTreeMap::new())),
127 }
128 }
129
130 #[instrument(skip(self), err)]
131 pub fn add_domain(&self, domain: String) -> Result<(), Box<dyn Error>> {
132 let mut schema_builder = Schema::builder();
133
134 let ts_opts = DateOptions::from(INDEXED)
135 .set_stored()
136 .set_fast()
137 .set_precision(tantivy::schema::DateTimePrecision::Microseconds);
138
139 let timestamp = schema_builder.add_date_field("timestamp", ts_opts);
144 let level = schema_builder.add_text_field("level", STRING | STORED);
145 let fields = schema_builder.add_json_field("fields", TEXT | STORED);
146 let spans = schema_builder.add_json_field("spans", TEXT | STORED);
147
148 let schema = schema_builder.build();
149
150 let index_dir = self.dir.join("search").join(&domain);
151 std::fs::create_dir_all(&index_dir)?;
152
153 let dir = MmapDirectory::open(index_dir)?;
154 let index = Index::open_or_create(dir, schema.clone())?;
155
156 let reader = index
157 .reader_builder()
158 .reload_policy(tantivy::ReloadPolicy::OnCommitWithDelay);
159
160 let query_parser = QueryParser::for_index(&index, vec![timestamp, level, fields, spans]);
161
162 let mut di = self.domain_indexes.write();
163 di.insert(domain, (schema, index, reader.try_into()?, query_parser));
164
165 Ok(())
166 }
167
168 #[instrument(skip(self), err)]
169 pub fn search(
170 &self,
171 domain: String,
172 query: String,
173 format: QueryFormat,
174 limit: Option<usize>,
175 ) -> Result<String, Box<dyn Error>> {
176 let di = self.domain_indexes.read();
177 let mut out = String::new();
178
179 if let Some((schema, _, reader, parser)) = di.get(&domain) {
180 let query = parser.parse_query(&query)?;
181 let searcher = reader.searcher();
182
183 match format {
184 QueryFormat::All => {
185 let docs = searcher.search(&*query, &tantivy::collector::DocSetCollector)?;
186
187 let mut lines = vec![];
188 tracing::info!(count = docs.len());
189
190 for doc_addr in docs {
191 let doc: TantivyDocument = searcher.doc(doc_addr)?;
192 let line = doc.to_json(&schema);
193 lines.push(line);
194 }
195
196 out = format!("[{}]", lines.join(","))
197 }
198 QueryFormat::Count => {
199 let count = searcher.search(&*query, &tantivy::collector::Count)?;
200
201 out = count.to_string();
202 }
203 QueryFormat::Top => {
204 if let Some(limit) = limit {
205 let docs = searcher
206 .search(&*query, &tantivy::collector::TopDocs::with_limit(limit))?;
207
208 let mut lines = vec![];
209 tracing::info!(count = docs.len());
210
211 for (_, doc_addr) in docs {
212 let doc: TantivyDocument = searcher.doc(doc_addr)?;
213 let line = doc.to_json(&schema);
214 lines.push(line);
215 }
216
217 out = format!("[{}]", lines.join(","))
218 }
219 }
220 }
221 }
222
223 Ok(out)
224 }
225
226 #[instrument(skip(dir, domain_indexes), err)]
227 fn index(
228 dir: &PathBuf,
229 domain_indexes: &Arc<RwLock<BTreeMap<String, (Schema, Index, IndexReader, QueryParser)>>>,
230 ) -> Result<(), Box<dyn Error>> {
231 tracing::info!("indexing");
232
233 let cursor_path = dir.join("cursor");
234
235 let file_cursor_path = cursor_path.join("file");
236 let file_cursor = match std::fs::read(&file_cursor_path) {
237 Ok(file) => dir.join(std::str::from_utf8(&file)?),
238 Err(_) => dir.join("."),
239 };
240
241 let line_cursor_path = cursor_path.join("line");
242 let line_cursor = match std::fs::read(&line_cursor_path) {
243 Ok(file) => std::str::from_utf8(&file)?.parse::<usize>()?,
244 Err(_) => 0,
245 };
246
247 tracing::info!(file_cursor = %file_cursor.display(), line_cursor);
248
249 let mut log_files = std::fs::read_dir(&dir)?
250 .map(|res| res.map(|e| e.path()))
251 .collect::<Result<Vec<_>, std::io::Error>>()?;
252 log_files.sort();
253
254 let di = domain_indexes.read();
255
256 let mut writer_map = BTreeMap::new();
257
258 for (domain, (schema, index, _, _)) in di.iter() {
259 let writer: IndexWriter = index.writer(15_000_000)?;
260
261 writer_map.insert(domain, (schema, writer));
262 }
263
264 let mut last_log_file = file_cursor.clone();
265 let mut last_log_line = line_cursor;
266
267 for log_file in log_files {
268 if log_file < *file_cursor || log_file.is_dir() {
269 continue;
270 }
271
272 tracing::info!(file = %log_file.display());
273
274 last_log_file = log_file.clone();
275
276 let file = File::open(&log_file)?;
277
278 let reader = BufReader::new(file);
279
280 for (i, line) in reader.lines().enumerate() {
281 if log_file == *file_cursor {
282 if i < line_cursor {
283 continue;
284 }
285 }
286
287 let line = line?;
288 let log = serde_json::from_str::<LogLine>(&line)?;
289
290 let mut log_domain = String::new();
291
292 if let Some(spans) = log.spans {
293 for span in spans {
294 if let Some(domain) = span.domain {
295 log_domain = domain;
296 break;
297 }
298 }
299 }
300
301 if let Some((schema, writer)) = writer_map.get(&log_domain) {
302 let doc = TantivyDocument::parse_json(&schema, &line)?;
303
304 writer.add_document(doc)?;
305 }
306
307 last_log_line = i;
308 }
309
310 tracing::info!(lines = last_log_line + 1);
311 }
312
313 tracing::info!(file_cursor = %last_log_file.display(), line_cursor = last_log_line);
314
315 let mut file_cursor = File::create(&file_cursor_path)?;
316 file_cursor.write_all(
317 last_log_file
318 .file_name()
319 .unwrap()
320 .to_str()
321 .unwrap()
322 .as_bytes(),
323 )?;
324
325 let mut line_cursor = File::create(&line_cursor_path)?;
326 line_cursor.write_all(last_log_line.to_string().as_bytes())?;
327
328 for (_, (_, mut writer)) in writer_map {
329 writer.commit()?;
330 }
331
332 Ok(())
333 }
334
335 pub fn start(&self) -> Result<(JoinHandle<()>, JoinHandle<()>), Box<dyn Error>> {
336 let cursor_path = self.dir.join("cursor");
337 std::fs::create_dir_all(&cursor_path)?;
338
339 let index_interval_s = self.index_interval_s;
340 let cleanup_interval_s = self.cleanup_interval_s;
341 let max_lines = self.max_lines;
342 let max_ttl_days = self.max_ttl_days;
343 let dir = self.dir.clone();
344 let domain_indexes = self.domain_indexes.clone();
345
346 let index_handle = tokio::spawn(async move {
350 loop {
351 tokio::time::sleep(Duration::from_secs(index_interval_s)).await;
352
353 match Self::index(&dir, &domain_indexes) {
354 Ok(()) => tracing::info!("indexed"),
355 Err(err) => tracing::error!(%err),
356 };
357 }
358 });
359
360 let cleanup_handle = tokio::spawn(async move {
361 loop {
362 tokio::time::sleep(Duration::from_secs(cleanup_interval_s)).await;
363 tracing::info!(max_lines, max_ttl_days, "todo: cleanup");
364 }
365 });
366
367 Ok((index_handle, cleanup_handle))
368 }
369}