Skip to main content

journal_engine/
histogram.rs

1//! Histogram functionality for generating time-series data from journal files.
2//!
3//! This module provides types and services for computing histograms of journal log entries
4//! over time ranges, with support for filtering and faceted field indexing.
5
6use crate::{cache::FileIndexKey, error::Result, facets::Facets};
7use journal_core::collections::HashSet;
8use journal_index::{Bitmap, FieldName, FieldValuePair, FileIndex, Filter, Seconds};
9use lru::LruCache;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::num::NonZeroUsize;
13use std::time::Duration;
14
15#[allow(unused_imports)]
16use tracing::{debug, error};
17
18/// Calculate the appropriate bucket duration for a given time range.
19///
20/// This function determines the bucket size that will result in approximately
21/// 50-100 buckets for the given time range. The bucket durations are selected
22/// from a predefined set of "nice" values (1s, 2s, 5s, 10s, 1m, 5m, 1h, etc.)
23/// to make the resulting histograms easy to interpret.
24///
25/// # Arguments
26/// * `time_range_duration` - The duration of the time range in seconds
27///
28/// # Returns
29/// The bucket duration in seconds
30pub fn calculate_bucket_duration(time_range_duration: u32) -> u32 {
31    const MINUTE: Duration = Duration::from_secs(60);
32    const HOUR: Duration = Duration::from_secs(60 * MINUTE.as_secs());
33    const DAY: Duration = Duration::from_secs(24 * HOUR.as_secs());
34
35    const VALID_DURATIONS: &[Duration] = &[
36        // Seconds
37        Duration::from_secs(1),
38        Duration::from_secs(2),
39        Duration::from_secs(5),
40        Duration::from_secs(10),
41        Duration::from_secs(15),
42        Duration::from_secs(30),
43        // Minutes
44        MINUTE,
45        Duration::from_secs(2 * MINUTE.as_secs()),
46        Duration::from_secs(3 * MINUTE.as_secs()),
47        Duration::from_secs(5 * MINUTE.as_secs()),
48        Duration::from_secs(10 * MINUTE.as_secs()),
49        Duration::from_secs(15 * MINUTE.as_secs()),
50        Duration::from_secs(30 * MINUTE.as_secs()),
51        // Hours
52        HOUR,
53        Duration::from_secs(2 * HOUR.as_secs()),
54        Duration::from_secs(6 * HOUR.as_secs()),
55        Duration::from_secs(8 * HOUR.as_secs()),
56        Duration::from_secs(12 * HOUR.as_secs()),
57        // Days
58        DAY,
59        Duration::from_secs(2 * DAY.as_secs()),
60        Duration::from_secs(3 * DAY.as_secs()),
61        Duration::from_secs(5 * DAY.as_secs()),
62        Duration::from_secs(7 * DAY.as_secs()),
63        Duration::from_secs(14 * DAY.as_secs()),
64        Duration::from_secs(30 * DAY.as_secs()),
65    ];
66
67    VALID_DURATIONS
68        .iter()
69        .rev()
70        .find(|&&bucket_width| time_range_duration as u64 / bucket_width.as_secs() >= 50)
71        .map(|d| d.as_secs())
72        .unwrap_or(1) as u32
73}
74
75/// A bucket request contains a [start, end) time range along with the
76/// filter that should be applied.
77#[derive(Debug, Clone, Eq, PartialEq, Hash)]
78pub struct BucketRequest {
79    /// Start time of the bucket request
80    pub start: Seconds,
81    /// End time of the bucket request
82    pub end: Seconds,
83    /// Facets to use for file index
84    pub facets: Facets,
85    /// Applied filter expression
86    pub filter_expr: Filter,
87}
88
89impl BucketRequest {
90    /// The duration of the bucket request in seconds
91    pub fn duration(&self) -> Seconds {
92        self.end - self.start
93    }
94}
95
96/// A bucket response containing aggregated field value counts.
97#[derive(Debug, Clone)]
98pub struct BucketResponse {
99    /// Maps field=value pairs to (unfiltered, filtered) counts
100    pub fv_counts: HashMap<FieldValuePair, (usize, usize)>,
101    /// Set of fields that are not indexed
102    pub unindexed_fields: HashSet<FieldName>,
103    /// Total entry counts (unfiltered, filtered) in this bucket across all files
104    pub total_entries: (usize, usize),
105}
106
107impl BucketResponse {
108    /// Creates a new empty bucket response.
109    pub(crate) fn new() -> Self {
110        Self {
111            fv_counts: HashMap::default(),
112            unindexed_fields: HashSet::default(),
113            total_entries: (0, 0),
114        }
115    }
116
117    /// Get all indexed field names from this bucket response.
118    pub fn indexed_fields(&self) -> HashSet<FieldName> {
119        self.fv_counts
120            .keys()
121            .map(|pair| pair.extract_field())
122            .collect()
123    }
124}
125
126/// Represents a histogram of journal log entries over time.
127///
128/// A histogram contains bucketed data where each bucket represents a time range
129/// and holds aggregated counts of field values and filtering results.
130#[derive(Debug, Clone)]
131pub struct Histogram {
132    pub buckets: Vec<(BucketRequest, BucketResponse)>,
133}
134
135impl Histogram {
136    /// Returns the start time of the histogram (first bucket's start time).
137    pub fn start_time(&self) -> Seconds {
138        let bucket_request = &self
139            .buckets
140            .first()
141            .expect("histogram with at least one bucket")
142            .0;
143        bucket_request.start
144    }
145
146    /// Returns the end time of the histogram (last bucket's end time).
147    pub fn end_time(&self) -> Seconds {
148        let bucket_request = &self
149            .buckets
150            .last()
151            .expect("histogram with at least one bucket")
152            .0;
153        bucket_request.end
154    }
155
156    /// Returns the duration of each bucket in seconds.
157    pub fn bucket_duration(&self) -> Seconds {
158        self.buckets
159            .first()
160            .expect("histogram with at least one bucket")
161            .0
162            .duration()
163    }
164
165    /// Returns all discovered field names from the histogram buckets in a deterministic order.
166    pub fn discovered_fields(&self) -> Vec<FieldName> {
167        // Collect all unique fields from all buckets
168        let mut fields = HashSet::default();
169        for (_, bucket_response) in &self.buckets {
170            fields.extend(bucket_response.indexed_fields());
171            fields.extend(bucket_response.unindexed_fields.iter().cloned());
172        }
173
174        let mut v: Vec<FieldName> = fields.into_iter().collect();
175        v.sort();
176        v
177    }
178}
179
180/// Engine for computing histograms from journal files.
181///
182/// The engine maintains caches and resources for efficiently computing histograms
183/// across multiple queries. It can be reused for multiple histogram computations.
184pub struct HistogramEngine {
185    responses: RwLock<LruCache<BucketRequest, BucketResponse>>,
186}
187
188impl HistogramEngine {
189    /// Creates a new HistogramEngine with a default capacity of 1000 bucket responses.
190    pub fn new() -> Self {
191        Self::with_capacity(1000)
192    }
193
194    /// Creates a new HistogramEngine with the specified cache capacity.
195    ///
196    /// The capacity determines how many bucket responses will be cached before
197    /// old entries are evicted using an LRU policy.
198    pub fn with_capacity(capacity: usize) -> Self {
199        Self {
200            responses: RwLock::new(LruCache::new(
201                NonZeroUsize::new(capacity).expect("capacity must be non-zero"),
202            )),
203        }
204    }
205
206    /// Compute a histogram from pre-indexed files.
207    ///
208    /// This method allows you to compute histograms from file indexes that have
209    /// already been loaded, avoiding redundant cache lookups and file discoveries.
210    ///
211    /// # Arguments
212    /// * `indexed_files` - Pre-computed file indexes
213    /// * `time_range` - Query time range with aligned boundaries and bucket duration
214    /// * `facets` - Fields to index
215    /// * `filter_expr` - Filter expression to apply
216    pub fn compute_from_indexes(
217        &self,
218        indexed_files: &[(FileIndexKey, FileIndex)],
219        time_range: &crate::QueryTimeRange,
220        facets: &[String],
221        filter_expr: &Filter,
222    ) -> Result<Histogram> {
223        let facets = Facets::new(facets);
224        let bucket_requests = bucket_requests_for(time_range, &facets, filter_expr);
225        let buckets_to_compute = self.buckets_to_compute(&bucket_requests);
226
227        if buckets_to_compute.is_empty() {
228            return Ok(self.histogram_from_cache(bucket_requests));
229        }
230
231        let (new_responses, bucket_cacheable) =
232            compute_bucket_responses(indexed_files, &buckets_to_compute);
233        self.cache_computed_responses(&new_responses, &bucket_cacheable);
234
235        Ok(self.histogram_from_responses(bucket_requests, &new_responses))
236    }
237
238    fn buckets_to_compute(&self, bucket_requests: &[BucketRequest]) -> Vec<BucketRequest> {
239        let responses = self.responses.read();
240        bucket_requests
241            .iter()
242            .filter(|br| !responses.contains(br))
243            .cloned()
244            .collect()
245    }
246
247    fn cache_computed_responses(
248        &self,
249        new_responses: &HashMap<BucketRequest, BucketResponse>,
250        bucket_cacheable: &HashMap<BucketRequest, bool>,
251    ) {
252        let mut responses_guard = self.responses.write();
253        for (bucket_request, response) in new_responses {
254            if bucket_cacheable
255                .get(bucket_request)
256                .copied()
257                .unwrap_or(false)
258            {
259                responses_guard.put(bucket_request.clone(), response.clone());
260            }
261        }
262    }
263
264    fn histogram_from_responses(
265        &self,
266        bucket_requests: Vec<BucketRequest>,
267        new_responses: &HashMap<BucketRequest, BucketResponse>,
268    ) -> Histogram {
269        let mut responses_guard = self.responses.write();
270        let buckets = bucket_requests
271            .into_iter()
272            .filter_map(|bucket_request| {
273                responses_guard
274                    .get(&bucket_request)
275                    .cloned()
276                    .or_else(|| new_responses.get(&bucket_request).cloned())
277                    .map(|response| (bucket_request, response))
278            })
279            .collect();
280
281        Histogram { buckets }
282    }
283
284    fn histogram_from_cache(&self, bucket_requests: Vec<BucketRequest>) -> Histogram {
285        let mut responses = self.responses.write();
286        let buckets = bucket_requests
287            .into_iter()
288            .filter_map(|bucket_request| {
289                responses
290                    .get(&bucket_request)
291                    .map(|response| (bucket_request, response.clone()))
292            })
293            .collect();
294
295        Histogram { buckets }
296    }
297}
298
299fn bucket_requests_for(
300    time_range: &crate::QueryTimeRange,
301    facets: &Facets,
302    filter_expr: &Filter,
303) -> Vec<BucketRequest> {
304    time_range
305        .buckets()
306        .map(|(start, end)| BucketRequest {
307            start: Seconds(start),
308            end: Seconds(end),
309            facets: facets.clone(),
310            filter_expr: filter_expr.clone(),
311        })
312        .collect()
313}
314
315fn compute_bucket_responses(
316    indexed_files: &[(FileIndexKey, FileIndex)],
317    buckets_to_compute: &[BucketRequest],
318) -> (
319    HashMap<BucketRequest, BucketResponse>,
320    HashMap<BucketRequest, bool>,
321) {
322    let mut new_responses = empty_bucket_responses(buckets_to_compute);
323    let mut bucket_cacheable = initially_cacheable_buckets(buckets_to_compute);
324
325    for (_, file_index) in indexed_files {
326        process_file_buckets(
327            file_index,
328            buckets_to_compute,
329            &mut new_responses,
330            &mut bucket_cacheable,
331        );
332    }
333
334    (new_responses, bucket_cacheable)
335}
336
337fn empty_bucket_responses(
338    bucket_requests: &[BucketRequest],
339) -> HashMap<BucketRequest, BucketResponse> {
340    bucket_requests
341        .iter()
342        .map(|br| (br.clone(), BucketResponse::new()))
343        .collect()
344}
345
346fn initially_cacheable_buckets(bucket_requests: &[BucketRequest]) -> HashMap<BucketRequest, bool> {
347    bucket_requests
348        .iter()
349        .map(|br| (br.clone(), true))
350        .collect()
351}
352
353fn process_file_buckets(
354    file_index: &FileIndex,
355    bucket_requests: &[BucketRequest],
356    responses: &mut HashMap<BucketRequest, BucketResponse>,
357    bucket_cacheable: &mut HashMap<BucketRequest, bool>,
358) {
359    for bucket_request in bucket_requests {
360        let Some(response) = responses.get_mut(bucket_request) else {
361            continue;
362        };
363        if !file_overlaps_bucket(file_index, bucket_request) {
364            continue;
365        }
366        if file_index.online() {
367            bucket_cacheable.insert(bucket_request.clone(), false);
368        }
369
370        let filter_bitmap = filter_bitmap_for_bucket(file_index, bucket_request);
371        update_bucket_totals(file_index, bucket_request, filter_bitmap.as_ref(), response);
372        record_unindexed_fields(file_index, response);
373        count_indexed_field_values(file_index, bucket_request, filter_bitmap.as_ref(), response);
374    }
375}
376
377fn file_overlaps_bucket(file_index: &FileIndex, bucket_request: &BucketRequest) -> bool {
378    file_index.start_time() < bucket_request.end && file_index.end_time() > bucket_request.start
379}
380
381fn filter_bitmap_for_bucket(
382    file_index: &FileIndex,
383    bucket_request: &BucketRequest,
384) -> Option<Bitmap> {
385    (!bucket_request.filter_expr.is_none()).then(|| bucket_request.filter_expr.evaluate(file_index))
386}
387
388fn update_bucket_totals(
389    file_index: &FileIndex,
390    bucket_request: &BucketRequest,
391    filter_bitmap: Option<&Bitmap>,
392    response: &mut BucketResponse,
393) {
394    let all_entries = Bitmap::insert_range(0..file_index.total_entries() as u32);
395    let unfiltered_total = count_entries(file_index, &all_entries, bucket_request);
396    let filtered_total = filter_bitmap
397        .map(|bitmap| count_entries(file_index, bitmap, bucket_request))
398        .unwrap_or(unfiltered_total);
399
400    response.total_entries.0 += unfiltered_total;
401    response.total_entries.1 += filtered_total;
402}
403
404fn count_entries(file_index: &FileIndex, bitmap: &Bitmap, bucket_request: &BucketRequest) -> usize {
405    file_index
406        .count_entries_in_time_range(bitmap, bucket_request.start, bucket_request.end)
407        .unwrap_or(0)
408}
409
410fn record_unindexed_fields(file_index: &FileIndex, response: &mut BucketResponse) {
411    for field in file_index.fields() {
412        if !file_index.is_indexed(field)
413            && let Some(field_name) = FieldName::new(field)
414        {
415            response.unindexed_fields.insert(field_name);
416        }
417    }
418}
419
420fn count_indexed_field_values(
421    file_index: &FileIndex,
422    bucket_request: &BucketRequest,
423    filter_bitmap: Option<&Bitmap>,
424    response: &mut BucketResponse,
425) {
426    for (indexed_field, field_bitmap) in file_index.bitmaps() {
427        let unfiltered_count = count_entries(file_index, field_bitmap, bucket_request);
428        let filtered_count = filtered_field_count(
429            file_index,
430            bucket_request,
431            field_bitmap,
432            filter_bitmap,
433            unfiltered_count,
434        );
435        add_field_counts(response, indexed_field, unfiltered_count, filtered_count);
436    }
437}
438
439fn filtered_field_count(
440    file_index: &FileIndex,
441    bucket_request: &BucketRequest,
442    field_bitmap: &Bitmap,
443    filter_bitmap: Option<&Bitmap>,
444    unfiltered_count: usize,
445) -> usize {
446    let Some(filter_bitmap) = filter_bitmap else {
447        return unfiltered_count;
448    };
449    let filtered_bitmap = field_bitmap & filter_bitmap;
450    count_entries(file_index, &filtered_bitmap, bucket_request)
451}
452
453fn add_field_counts(
454    response: &mut BucketResponse,
455    indexed_field: &FieldValuePair,
456    unfiltered_count: usize,
457    filtered_count: usize,
458) {
459    if let Some(pair) = FieldValuePair::parse(indexed_field) {
460        let counts = response.fv_counts.entry(pair).or_insert((0, 0));
461        counts.0 += unfiltered_count;
462        counts.1 += filtered_count;
463    }
464}