1use crate::{cache::FileIndexKey, error::Result, facets::Facets};
7use journal_core::collections::HashSet;
8use journal_index::{Bitmap, FieldName, FieldValuePair, FileIndex, Filter, Seconds};
9use lru::LruCache;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::num::NonZeroUsize;
13use std::time::Duration;
14
15#[allow(unused_imports)]
16use tracing::{debug, error};
17
18pub fn calculate_bucket_duration(time_range_duration: u32) -> u32 {
31 const MINUTE: Duration = Duration::from_secs(60);
32 const HOUR: Duration = Duration::from_secs(60 * MINUTE.as_secs());
33 const DAY: Duration = Duration::from_secs(24 * HOUR.as_secs());
34
35 const VALID_DURATIONS: &[Duration] = &[
36 Duration::from_secs(1),
38 Duration::from_secs(2),
39 Duration::from_secs(5),
40 Duration::from_secs(10),
41 Duration::from_secs(15),
42 Duration::from_secs(30),
43 MINUTE,
45 Duration::from_secs(2 * MINUTE.as_secs()),
46 Duration::from_secs(3 * MINUTE.as_secs()),
47 Duration::from_secs(5 * MINUTE.as_secs()),
48 Duration::from_secs(10 * MINUTE.as_secs()),
49 Duration::from_secs(15 * MINUTE.as_secs()),
50 Duration::from_secs(30 * MINUTE.as_secs()),
51 HOUR,
53 Duration::from_secs(2 * HOUR.as_secs()),
54 Duration::from_secs(6 * HOUR.as_secs()),
55 Duration::from_secs(8 * HOUR.as_secs()),
56 Duration::from_secs(12 * HOUR.as_secs()),
57 DAY,
59 Duration::from_secs(2 * DAY.as_secs()),
60 Duration::from_secs(3 * DAY.as_secs()),
61 Duration::from_secs(5 * DAY.as_secs()),
62 Duration::from_secs(7 * DAY.as_secs()),
63 Duration::from_secs(14 * DAY.as_secs()),
64 Duration::from_secs(30 * DAY.as_secs()),
65 ];
66
67 VALID_DURATIONS
68 .iter()
69 .rev()
70 .find(|&&bucket_width| time_range_duration as u64 / bucket_width.as_secs() >= 50)
71 .map(|d| d.as_secs())
72 .unwrap_or(1) as u32
73}
74
75#[derive(Debug, Clone, Eq, PartialEq, Hash)]
78pub struct BucketRequest {
79 pub start: Seconds,
81 pub end: Seconds,
83 pub facets: Facets,
85 pub filter_expr: Filter,
87}
88
89impl BucketRequest {
90 pub fn duration(&self) -> Seconds {
92 self.end - self.start
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct BucketResponse {
99 pub fv_counts: HashMap<FieldValuePair, (usize, usize)>,
101 pub unindexed_fields: HashSet<FieldName>,
103 pub total_entries: (usize, usize),
105}
106
107impl BucketResponse {
108 pub(crate) fn new() -> Self {
110 Self {
111 fv_counts: HashMap::default(),
112 unindexed_fields: HashSet::default(),
113 total_entries: (0, 0),
114 }
115 }
116
117 pub fn indexed_fields(&self) -> HashSet<FieldName> {
119 self.fv_counts
120 .keys()
121 .map(|pair| pair.extract_field())
122 .collect()
123 }
124}
125
126#[derive(Debug, Clone)]
131pub struct Histogram {
132 pub buckets: Vec<(BucketRequest, BucketResponse)>,
133}
134
135impl Histogram {
136 pub fn start_time(&self) -> Seconds {
138 let bucket_request = &self
139 .buckets
140 .first()
141 .expect("histogram with at least one bucket")
142 .0;
143 bucket_request.start
144 }
145
146 pub fn end_time(&self) -> Seconds {
148 let bucket_request = &self
149 .buckets
150 .last()
151 .expect("histogram with at least one bucket")
152 .0;
153 bucket_request.end
154 }
155
156 pub fn bucket_duration(&self) -> Seconds {
158 self.buckets
159 .first()
160 .expect("histogram with at least one bucket")
161 .0
162 .duration()
163 }
164
165 pub fn discovered_fields(&self) -> Vec<FieldName> {
167 let mut fields = HashSet::default();
169 for (_, bucket_response) in &self.buckets {
170 fields.extend(bucket_response.indexed_fields());
171 fields.extend(bucket_response.unindexed_fields.iter().cloned());
172 }
173
174 let mut v: Vec<FieldName> = fields.into_iter().collect();
175 v.sort();
176 v
177 }
178}
179
180pub struct HistogramEngine {
185 responses: RwLock<LruCache<BucketRequest, BucketResponse>>,
186}
187
188impl HistogramEngine {
189 pub fn new() -> Self {
191 Self::with_capacity(1000)
192 }
193
194 pub fn with_capacity(capacity: usize) -> Self {
199 Self {
200 responses: RwLock::new(LruCache::new(
201 NonZeroUsize::new(capacity).expect("capacity must be non-zero"),
202 )),
203 }
204 }
205
206 pub fn compute_from_indexes(
217 &self,
218 indexed_files: &[(FileIndexKey, FileIndex)],
219 time_range: &crate::QueryTimeRange,
220 facets: &[String],
221 filter_expr: &Filter,
222 ) -> Result<Histogram> {
223 let facets = Facets::new(facets);
224 let bucket_requests = bucket_requests_for(time_range, &facets, filter_expr);
225 let buckets_to_compute = self.buckets_to_compute(&bucket_requests);
226
227 if buckets_to_compute.is_empty() {
228 return Ok(self.histogram_from_cache(bucket_requests));
229 }
230
231 let (new_responses, bucket_cacheable) =
232 compute_bucket_responses(indexed_files, &buckets_to_compute);
233 self.cache_computed_responses(&new_responses, &bucket_cacheable);
234
235 Ok(self.histogram_from_responses(bucket_requests, &new_responses))
236 }
237
238 fn buckets_to_compute(&self, bucket_requests: &[BucketRequest]) -> Vec<BucketRequest> {
239 let responses = self.responses.read();
240 bucket_requests
241 .iter()
242 .filter(|br| !responses.contains(br))
243 .cloned()
244 .collect()
245 }
246
247 fn cache_computed_responses(
248 &self,
249 new_responses: &HashMap<BucketRequest, BucketResponse>,
250 bucket_cacheable: &HashMap<BucketRequest, bool>,
251 ) {
252 let mut responses_guard = self.responses.write();
253 for (bucket_request, response) in new_responses {
254 if bucket_cacheable
255 .get(bucket_request)
256 .copied()
257 .unwrap_or(false)
258 {
259 responses_guard.put(bucket_request.clone(), response.clone());
260 }
261 }
262 }
263
264 fn histogram_from_responses(
265 &self,
266 bucket_requests: Vec<BucketRequest>,
267 new_responses: &HashMap<BucketRequest, BucketResponse>,
268 ) -> Histogram {
269 let mut responses_guard = self.responses.write();
270 let buckets = bucket_requests
271 .into_iter()
272 .filter_map(|bucket_request| {
273 responses_guard
274 .get(&bucket_request)
275 .cloned()
276 .or_else(|| new_responses.get(&bucket_request).cloned())
277 .map(|response| (bucket_request, response))
278 })
279 .collect();
280
281 Histogram { buckets }
282 }
283
284 fn histogram_from_cache(&self, bucket_requests: Vec<BucketRequest>) -> Histogram {
285 let mut responses = self.responses.write();
286 let buckets = bucket_requests
287 .into_iter()
288 .filter_map(|bucket_request| {
289 responses
290 .get(&bucket_request)
291 .map(|response| (bucket_request, response.clone()))
292 })
293 .collect();
294
295 Histogram { buckets }
296 }
297}
298
299fn bucket_requests_for(
300 time_range: &crate::QueryTimeRange,
301 facets: &Facets,
302 filter_expr: &Filter,
303) -> Vec<BucketRequest> {
304 time_range
305 .buckets()
306 .map(|(start, end)| BucketRequest {
307 start: Seconds(start),
308 end: Seconds(end),
309 facets: facets.clone(),
310 filter_expr: filter_expr.clone(),
311 })
312 .collect()
313}
314
315fn compute_bucket_responses(
316 indexed_files: &[(FileIndexKey, FileIndex)],
317 buckets_to_compute: &[BucketRequest],
318) -> (
319 HashMap<BucketRequest, BucketResponse>,
320 HashMap<BucketRequest, bool>,
321) {
322 let mut new_responses = empty_bucket_responses(buckets_to_compute);
323 let mut bucket_cacheable = initially_cacheable_buckets(buckets_to_compute);
324
325 for (_, file_index) in indexed_files {
326 process_file_buckets(
327 file_index,
328 buckets_to_compute,
329 &mut new_responses,
330 &mut bucket_cacheable,
331 );
332 }
333
334 (new_responses, bucket_cacheable)
335}
336
337fn empty_bucket_responses(
338 bucket_requests: &[BucketRequest],
339) -> HashMap<BucketRequest, BucketResponse> {
340 bucket_requests
341 .iter()
342 .map(|br| (br.clone(), BucketResponse::new()))
343 .collect()
344}
345
346fn initially_cacheable_buckets(bucket_requests: &[BucketRequest]) -> HashMap<BucketRequest, bool> {
347 bucket_requests
348 .iter()
349 .map(|br| (br.clone(), true))
350 .collect()
351}
352
353fn process_file_buckets(
354 file_index: &FileIndex,
355 bucket_requests: &[BucketRequest],
356 responses: &mut HashMap<BucketRequest, BucketResponse>,
357 bucket_cacheable: &mut HashMap<BucketRequest, bool>,
358) {
359 for bucket_request in bucket_requests {
360 let Some(response) = responses.get_mut(bucket_request) else {
361 continue;
362 };
363 if !file_overlaps_bucket(file_index, bucket_request) {
364 continue;
365 }
366 if file_index.online() {
367 bucket_cacheable.insert(bucket_request.clone(), false);
368 }
369
370 let filter_bitmap = filter_bitmap_for_bucket(file_index, bucket_request);
371 update_bucket_totals(file_index, bucket_request, filter_bitmap.as_ref(), response);
372 record_unindexed_fields(file_index, response);
373 count_indexed_field_values(file_index, bucket_request, filter_bitmap.as_ref(), response);
374 }
375}
376
377fn file_overlaps_bucket(file_index: &FileIndex, bucket_request: &BucketRequest) -> bool {
378 file_index.start_time() < bucket_request.end && file_index.end_time() > bucket_request.start
379}
380
381fn filter_bitmap_for_bucket(
382 file_index: &FileIndex,
383 bucket_request: &BucketRequest,
384) -> Option<Bitmap> {
385 (!bucket_request.filter_expr.is_none()).then(|| bucket_request.filter_expr.evaluate(file_index))
386}
387
388fn update_bucket_totals(
389 file_index: &FileIndex,
390 bucket_request: &BucketRequest,
391 filter_bitmap: Option<&Bitmap>,
392 response: &mut BucketResponse,
393) {
394 let all_entries = Bitmap::insert_range(0..file_index.total_entries() as u32);
395 let unfiltered_total = count_entries(file_index, &all_entries, bucket_request);
396 let filtered_total = filter_bitmap
397 .map(|bitmap| count_entries(file_index, bitmap, bucket_request))
398 .unwrap_or(unfiltered_total);
399
400 response.total_entries.0 += unfiltered_total;
401 response.total_entries.1 += filtered_total;
402}
403
404fn count_entries(file_index: &FileIndex, bitmap: &Bitmap, bucket_request: &BucketRequest) -> usize {
405 file_index
406 .count_entries_in_time_range(bitmap, bucket_request.start, bucket_request.end)
407 .unwrap_or(0)
408}
409
410fn record_unindexed_fields(file_index: &FileIndex, response: &mut BucketResponse) {
411 for field in file_index.fields() {
412 if !file_index.is_indexed(field)
413 && let Some(field_name) = FieldName::new(field)
414 {
415 response.unindexed_fields.insert(field_name);
416 }
417 }
418}
419
420fn count_indexed_field_values(
421 file_index: &FileIndex,
422 bucket_request: &BucketRequest,
423 filter_bitmap: Option<&Bitmap>,
424 response: &mut BucketResponse,
425) {
426 for (indexed_field, field_bitmap) in file_index.bitmaps() {
427 let unfiltered_count = count_entries(file_index, field_bitmap, bucket_request);
428 let filtered_count = filtered_field_count(
429 file_index,
430 bucket_request,
431 field_bitmap,
432 filter_bitmap,
433 unfiltered_count,
434 );
435 add_field_counts(response, indexed_field, unfiltered_count, filtered_count);
436 }
437}
438
439fn filtered_field_count(
440 file_index: &FileIndex,
441 bucket_request: &BucketRequest,
442 field_bitmap: &Bitmap,
443 filter_bitmap: Option<&Bitmap>,
444 unfiltered_count: usize,
445) -> usize {
446 let Some(filter_bitmap) = filter_bitmap else {
447 return unfiltered_count;
448 };
449 let filtered_bitmap = field_bitmap & filter_bitmap;
450 count_entries(file_index, &filtered_bitmap, bucket_request)
451}
452
453fn add_field_counts(
454 response: &mut BucketResponse,
455 indexed_field: &FieldValuePair,
456 unfiltered_count: usize,
457 filtered_count: usize,
458) {
459 if let Some(pair) = FieldValuePair::parse(indexed_field) {
460 let counts = response.fv_counts.entry(pair).or_insert((0, 0));
461 counts.0 += unfiltered_count;
462 counts.1 += filtered_count;
463 }
464}