Skip to main content

journal/
netdata.rs

1use crate::explorer::{
2    ExplorerSamplingState, empty_histogram_for_query, histogram_bucket_count_for_query,
3};
4use crate::{
5    Direction, ExplorerAnchor, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
6    ExplorerFtsPattern, ExplorerHistogram, ExplorerProgress, ExplorerQuery, ExplorerResult,
7    ExplorerRow, ExplorerSampling, ExplorerStats, ExplorerStopReason, ExplorerStrategy, FileHeader,
8    FileReader, ReaderOptions, Result, SdkError,
9};
10use chrono::{DateTime, Utc};
11use serde_json::{Map, Value, json};
12use std::cell::RefCell;
13use std::cmp::{Ordering, Reverse};
14use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet, VecDeque};
15#[cfg(unix)]
16use std::ffi::CStr;
17use std::path::{Path, PathBuf};
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19
20const DEFAULT_FUNCTION_NAME: &str = "systemd-journal";
21const DEFAULT_SOURCE_SELECTOR_NAME: &str = "Journal Sources";
22const DEFAULT_SOURCE_SELECTOR_HELP: &str = "Select the logs source to query";
23const DEFAULT_ITEMS_TO_RETURN: usize = 200;
24const DEFAULT_TIME_WINDOW_SECONDS: i64 = 3600;
25const DEFAULT_ITEMS_SAMPLING: u64 = 1_000_000;
26const DATA_ONLY_CHECK_EVERY_ROWS: u64 = 128;
27const API_RELATIVE_TIME_MAX_SECONDS: i64 = 3 * 365 * 86_400;
28const NETDATA_MISSING_AFTER_RELATIVE_SECONDS: i64 = 600;
29const DEFAULT_HISTOGRAM_BUCKETS: usize = 150;
30const EFFECTIVELY_DISABLED_TIMEOUT_SECONDS: u64 = 100 * 365 * 24 * 60 * 60;
31const NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC: u64 = 5_000_000;
32const NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC: u64 = 2 * 60 * 1_000_000;
33const NETDATA_EMPTY_STRING_FACET_HASH_ID: &str = "CzGfAU2z3TC";
34const NETDATA_UNAVAILABLE_FIELD_LABEL: &str = "[unavailable field]";
35const NETDATA_FACET_MAX_VALUE_LENGTH: usize = 8192;
36const NETDATA_MAX_DIRECTORY_SCAN_DEPTH: usize = 64;
37const NETDATA_MAX_DIRECTORY_SCAN_COUNT: usize = 8192;
38const SOURCE_TYPE_ALL: u64 = 1 << 0;
39const SOURCE_TYPE_LOCAL_ALL: u64 = 1 << 1;
40const SOURCE_TYPE_REMOTE_ALL: u64 = 1 << 2;
41const SOURCE_TYPE_LOCAL_SYSTEM: u64 = 1 << 3;
42const SOURCE_TYPE_LOCAL_USER: u64 = 1 << 4;
43const SOURCE_TYPE_LOCAL_NAMESPACE: u64 = 1 << 5;
44const SOURCE_TYPE_LOCAL_OTHER: u64 = 1 << 6;
45
46pub const NETDATA_SOURCE_TYPE_ALL: u64 = SOURCE_TYPE_ALL;
47pub const NETDATA_SOURCE_TYPE_LOCAL_ALL: u64 = SOURCE_TYPE_LOCAL_ALL;
48pub const NETDATA_SOURCE_TYPE_REMOTE_ALL: u64 = SOURCE_TYPE_REMOTE_ALL;
49pub const NETDATA_SOURCE_TYPE_LOCAL_SYSTEM: u64 = SOURCE_TYPE_LOCAL_SYSTEM;
50pub const NETDATA_SOURCE_TYPE_LOCAL_USER: u64 = SOURCE_TYPE_LOCAL_USER;
51pub const NETDATA_SOURCE_TYPE_LOCAL_NAMESPACE: u64 = SOURCE_TYPE_LOCAL_NAMESPACE;
52pub const NETDATA_SOURCE_TYPE_LOCAL_OTHER: u64 = SOURCE_TYPE_LOCAL_OTHER;
53
54const NETDATA_ACCEPTED_PARAMS: &[&str] = &[
55    "info",
56    "__logs_sources",
57    "after",
58    "before",
59    "anchor",
60    "direction",
61    "last",
62    "query",
63    "facets",
64    "histogram",
65    "if_modified_since",
66    "data_only",
67    "delta",
68    "tail",
69    "sampling",
70    "slice",
71];
72
73const SYSTEMD_DEFAULT_VIEW_KEYS: &[&str] = &[
74    "_HOSTNAME",
75    "ND_JOURNAL_PROCESS",
76    "MESSAGE",
77    "PRIORITY",
78    "SYSLOG_FACILITY",
79    "ERRNO",
80    "ND_JOURNAL_FILE",
81    "SYSLOG_IDENTIFIER",
82    "UNIT",
83    "USER_UNIT",
84    "MESSAGE_ID",
85    "_BOOT_ID",
86    "_SYSTEMD_OWNER_UID",
87    "_UID",
88    "OBJECT_SYSTEMD_OWNER_UID",
89    "OBJECT_UID",
90    "_GID",
91    "OBJECT_GID",
92    "_CAP_EFFECTIVE",
93    "_AUDIT_LOGINUID",
94    "OBJECT_AUDIT_LOGINUID",
95    "_SOURCE_REALTIME_TIMESTAMP",
96];
97
98const SYSTEMD_DEFAULT_FACETS: &[&str] = &[
99    "_HOSTNAME",
100    "PRIORITY",
101    "SYSLOG_FACILITY",
102    "ERRNO",
103    "SYSLOG_IDENTIFIER",
104    "UNIT",
105    "USER_UNIT",
106    "MESSAGE_ID",
107    "_BOOT_ID",
108    "_SYSTEMD_OWNER_UID",
109    "_UID",
110    "OBJECT_SYSTEMD_OWNER_UID",
111    "OBJECT_UID",
112    "_GID",
113    "OBJECT_GID",
114    "_AUDIT_LOGINUID",
115    "OBJECT_AUDIT_LOGINUID",
116    "CODE_FILE",
117    "_SYSTEMD_UNIT",
118    "_SYSTEMD_USER_SLICE",
119    "CODE_FUNC",
120    "_TRANSPORT",
121    "_COMM",
122    "_RUNTIME_SCOPE",
123    "_MACHINE_ID",
124    "_SYSTEMD_SLICE",
125    "UNIT_RESULT",
126    "_SYSTEMD_CGROUP",
127    "_EXE",
128    "_SYSTEMD_USER_UNIT",
129    "_SYSTEMD_SESSION",
130    "COREDUMP_CGROUP",
131    "COREDUMP_USER_UNIT",
132    "COREDUMP_UNIT",
133    "COREDUMP_SIGNAL_NAME",
134    "COREDUMP_COMM",
135    "_UDEV_DEVNODE",
136    "_KERNEL_SUBSYSTEM",
137    "OBJECT_EXE",
138    "OBJECT_SYSTEMD_CGROUP",
139    "OBJECT_COMM",
140    "OBJECT_SYSTEMD_UNIT",
141    "OBJECT_SYSTEMD_USER_UNIT",
142    "_SELINUX_CONTEXT",
143    "_NAMESPACE",
144    "OBJECT_SYSTEMD_SESSION",
145    "CONTAINER_ID",
146    "CONTAINER_NAME",
147    "CONTAINER_TAG",
148    "IMAGE_NAME",
149    "ND_NIDL_NODE",
150    "ND_NIDL_CONTEXT",
151    "ND_LOG_SOURCE",
152    "ND_ALERT_NAME",
153    "ND_ALERT_CLASS",
154    "ND_ALERT_COMPONENT",
155    "ND_ALERT_TYPE",
156    "ND_ALERT_STATUS",
157];
158
159#[derive(Debug, Clone)]
160pub struct NetdataFunctionConfig {
161    pub function_name: String,
162    pub source_selector_name: String,
163    pub source_selector_help: String,
164    pub default_facets: Vec<String>,
165    pub default_view_keys: Vec<String>,
166    pub default_histogram: Option<String>,
167    pub reader_options: ReaderOptions,
168    pub explorer_strategy: ExplorerStrategy,
169}
170
171impl NetdataFunctionConfig {
172    pub fn systemd_journal() -> Self {
173        Self {
174            function_name: DEFAULT_FUNCTION_NAME.to_string(),
175            source_selector_name: DEFAULT_SOURCE_SELECTOR_NAME.to_string(),
176            source_selector_help: DEFAULT_SOURCE_SELECTOR_HELP.to_string(),
177            default_facets: SYSTEMD_DEFAULT_FACETS
178                .iter()
179                .map(|field| (*field).to_string())
180                .collect(),
181            default_view_keys: SYSTEMD_DEFAULT_VIEW_KEYS
182                .iter()
183                .map(|field| (*field).to_string())
184                .collect(),
185            default_histogram: Some("PRIORITY".to_string()),
186            reader_options: ReaderOptions::snapshot(),
187            explorer_strategy: ExplorerStrategy::Traversal,
188        }
189    }
190}
191
192impl Default for NetdataFunctionConfig {
193    fn default() -> Self {
194        Self::systemd_journal()
195    }
196}
197
198#[derive(Debug, Default)]
199pub struct DisplayContext {
200    boot_first_realtime: BTreeMap<Vec<u8>, u64>,
201    uid_display_cache: RefCell<BTreeMap<String, String>>,
202    gid_display_cache: RefCell<BTreeMap<String, String>>,
203}
204
205#[derive(Debug, Clone, Copy)]
206pub enum DisplayScope {
207    Data,
208    Facet,
209    Histogram,
210}
211
212pub trait NetdataFunctionProfile {
213    fn field_display_value(
214        &self,
215        _context: &DisplayContext,
216        _scope: DisplayScope,
217        _field: &str,
218        value: &[u8],
219    ) -> Value {
220        Value::String(String::from_utf8_lossy(value).into_owned())
221    }
222
223    fn facet_option_name(&self, context: &DisplayContext, field: &str, raw_value: &[u8]) -> String {
224        match self.field_display_value(context, DisplayScope::Facet, field, raw_value) {
225            Value::String(value) => value,
226            other => other.to_string(),
227        }
228    }
229
230    fn row_options(&self, fields: &BTreeMap<String, Vec<Vec<u8>>>) -> Value {
231        if let Some(priority) = first_value(fields, "PRIORITY") {
232            return json!({ "severity": priority_to_row_severity(priority) });
233        }
234        json!({ "severity": "normal" })
235    }
236}
237
238#[derive(Debug, Clone, Copy, Default)]
239pub struct SystemdJournalProfile;
240
241#[derive(Debug, Clone, Copy, Default)]
242pub struct SystemdJournalPluginProfile;
243
244impl NetdataFunctionProfile for SystemdJournalProfile {
245    fn field_display_value(
246        &self,
247        context: &DisplayContext,
248        scope: DisplayScope,
249        field: &str,
250        value: &[u8],
251    ) -> Value {
252        systemd_field_display_value(context, scope, field, value, false)
253    }
254}
255
256impl NetdataFunctionProfile for SystemdJournalPluginProfile {
257    fn field_display_value(
258        &self,
259        context: &DisplayContext,
260        scope: DisplayScope,
261        field: &str,
262        value: &[u8],
263    ) -> Value {
264        systemd_field_display_value(context, scope, field, value, true)
265    }
266}
267
268#[derive(Debug, Clone)]
269pub struct NetdataJournalFunction<P = SystemdJournalProfile> {
270    config: NetdataFunctionConfig,
271    profile: P,
272}
273
274#[derive(Debug, Clone)]
275pub struct NetdataFunctionProgress {
276    pub current_file: usize,
277    pub total_files: usize,
278    pub matched_files: u64,
279    pub skipped_files: u64,
280    pub stats: ExplorerStats,
281    pub elapsed: Duration,
282}
283
284#[derive(Debug, Clone, Default, PartialEq, Eq)]
285pub struct NetdataJournalFileMetadata {
286    pub source_type: Option<u64>,
287    pub source_name: Option<String>,
288    pub file_last_modified_usec: Option<u64>,
289    pub msg_first_realtime_usec: Option<u64>,
290    pub msg_last_realtime_usec: Option<u64>,
291    pub journal_vs_realtime_delta_usec: Option<u64>,
292}
293
294pub trait NetdataFunctionState {
295    fn file_metadata(&self, _path: &Path) -> Option<NetdataJournalFileMetadata> {
296        None
297    }
298
299    fn update_file_journal_vs_realtime_delta_usec(&mut self, _path: &Path, _delta_usec: u64) {}
300}
301
302pub struct NetdataFunctionRunOptions<'a> {
303    pub timeout: Option<Duration>,
304    pub progress_callback: Option<&'a mut dyn FnMut(NetdataFunctionProgress)>,
305    pub cancellation_callback: Option<&'a dyn Fn() -> bool>,
306    pub state: Option<&'a mut dyn NetdataFunctionState>,
307    pub progress_interval: Duration,
308}
309
310impl NetdataFunctionRunOptions<'_> {
311    pub fn from_timeout_seconds(seconds: u64) -> Self {
312        let seconds = if seconds == 0 {
313            EFFECTIVELY_DISABLED_TIMEOUT_SECONDS
314        } else {
315            seconds
316        };
317        Self {
318            timeout: Some(Duration::from_secs(seconds)),
319            progress_callback: None,
320            cancellation_callback: None,
321            state: None,
322            progress_interval: Duration::from_millis(250),
323        }
324    }
325}
326
327impl Default for NetdataFunctionRunOptions<'_> {
328    fn default() -> Self {
329        Self {
330            timeout: Some(Duration::from_secs(EFFECTIVELY_DISABLED_TIMEOUT_SECONDS)),
331            progress_callback: None,
332            cancellation_callback: None,
333            state: None,
334            progress_interval: Duration::from_millis(250),
335        }
336    }
337}
338
339impl NetdataJournalFunction<SystemdJournalProfile> {
340    pub fn systemd_journal() -> Self {
341        Self {
342            config: NetdataFunctionConfig::systemd_journal(),
343            profile: SystemdJournalProfile,
344        }
345    }
346}
347
348impl NetdataJournalFunction<SystemdJournalPluginProfile> {
349    pub fn systemd_journal_plugin_compatible() -> Self {
350        Self {
351            config: NetdataFunctionConfig::systemd_journal(),
352            profile: SystemdJournalPluginProfile,
353        }
354    }
355}
356
357impl<P> NetdataJournalFunction<P>
358where
359    P: NetdataFunctionProfile,
360{
361    pub fn new(mut config: NetdataFunctionConfig, profile: P) -> Self {
362        if config.source_selector_name.is_empty() {
363            config.source_selector_name = DEFAULT_SOURCE_SELECTOR_NAME.to_string();
364        }
365        if config.source_selector_help.is_empty() {
366            config.source_selector_help = DEFAULT_SOURCE_SELECTOR_HELP.to_string();
367        }
368        Self { config, profile }
369    }
370
371    pub fn run_directory_request_json(&self, directory: &Path, request: &Value) -> Result<Value> {
372        self.run_directory_request_json_with_options(
373            directory,
374            request,
375            NetdataFunctionRunOptions::default(),
376        )
377    }
378
379    pub fn run_directory_request_json_with_options(
380        &self,
381        directory: &Path,
382        request: &Value,
383        mut options: NetdataFunctionRunOptions<'_>,
384    ) -> Result<Value> {
385        let request = NetdataRequest::parse(request, &self.config)?;
386        let collection = collect_journal_files(directory)?;
387        let paths = collection.files;
388        if request.info {
389            return Ok(self.info_response(request.echo, &paths, &options));
390        }
391        let annotation_paths = paths.clone();
392
393        let selected =
394            select_journal_files_for_request(paths, &request, self.config.reader_options, &options);
395        if let Some(response) = not_modified_before_scan_response(&request, &selected) {
396            return Ok(response);
397        }
398        let selected_files = selected.files;
399        let deadline = options.timeout.map(|timeout| Instant::now() + timeout);
400        let mut combined = self.explore_files(&selected_files, &request, deadline, &mut options)?;
401        self.finalize_combined_result(
402            &request,
403            &selected_files,
404            deadline,
405            &mut options,
406            &mut combined,
407            collection.skipped,
408            collection.errors,
409        )?;
410        Ok(self.query_response(request, &annotation_paths, combined))
411    }
412
413    fn finalize_combined_result(
414        &self,
415        request: &NetdataRequest,
416        selected_files: &[SelectedJournalFile],
417        deadline: Option<Instant>,
418        options: &mut NetdataFunctionRunOptions<'_>,
419        combined: &mut CombinedResult,
420        skipped_files: u64,
421        file_errors: Vec<String>,
422    ) -> Result<()> {
423        combined.skipped_files = combined.skipped_files.saturating_add(skipped_files);
424        combined.file_errors.extend(file_errors);
425        if combined.cancelled {
426            return Ok(());
427        }
428        if !request.data_only {
429            combined.add_zero_count_facet_values_from_files(
430                &request.facets,
431                self.config.reader_options,
432            );
433            combined.add_zero_count_selected_filter_values(request);
434        }
435        if should_collect_unfiltered_facet_vocabulary(request, combined) {
436            let vocabulary = self.explore_files(
437                selected_files,
438                &request.unfiltered_vocabulary(),
439                deadline,
440                options,
441            )?;
442            combined.add_zero_count_facet_values(&vocabulary.facets);
443        }
444        Ok(())
445    }
446
447    pub fn run_directory_request_bytes(&self, directory: &Path, request: &[u8]) -> Result<Value> {
448        self.run_directory_request_bytes_with_options(
449            directory,
450            request,
451            NetdataFunctionRunOptions::default(),
452        )
453    }
454
455    pub fn run_directory_request_bytes_with_options(
456        &self,
457        directory: &Path,
458        request: &[u8],
459        options: NetdataFunctionRunOptions<'_>,
460    ) -> Result<Value> {
461        let request: Value = serde_json::from_slice(request).map_err(|err| {
462            SdkError::InvalidPath(format!("invalid Netdata function JSON: {err}"))
463        })?;
464        self.run_directory_request_json_with_options(directory, &request, options)
465    }
466
467    fn explore_files(
468        &self,
469        files: &[SelectedJournalFile],
470        request: &NetdataRequest,
471        deadline: Option<Instant>,
472        options: &mut NetdataFunctionRunOptions<'_>,
473    ) -> Result<CombinedResult> {
474        let query = request.to_explorer_query(
475            files.len() as u64,
476            None,
477            NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
478        );
479        let mut combined = CombinedResult::default();
480        let page_window = RefCell::new(NetdataPageWindow::for_request(request));
481        combined.sampling_enabled = query.sampling.is_some();
482        let mut sampling_state =
483            ExplorerSamplingState::for_query(&query, histogram_bucket_count_for_query(&query));
484        let realtime_adjuster = RefCell::new(NetdataRealtimeAdjuster::new(request.direction));
485        let started = Instant::now();
486        let total_files = files.len();
487        for (file_index, file) in files.iter().enumerate() {
488            let path = &file.path;
489            if should_stop_before_file(&mut combined, deadline, options) {
490                break;
491            }
492            let Some(mut reader) = self.open_file_for_explore(
493                path,
494                &mut combined,
495                options,
496                progress_context(file_index, total_files, started),
497            )?
498            else {
499                continue;
500            };
501            combined.matched_files = combined.matched_files.saturating_add(1);
502            combined.matched_paths.push(path.clone());
503            let query = request.file_query(files.len(), reader.header(), &file.order);
504            collect_column_fields_for_file(&mut reader, request, path, &mut combined);
505            let explored = self.explore_single_file(
506                &mut reader,
507                request,
508                &query,
509                deadline,
510                options,
511                &combined,
512                &page_window,
513                sampling_state.as_mut(),
514                &realtime_adjuster,
515                progress_context(file_index, total_files, started),
516                file.order.journal_vs_realtime_delta_usec,
517            );
518            let Some((result, stop_reason)) = record_explore_result(explored, path, &mut combined)
519            else {
520                continue;
521            };
522            if finish_explored_file(
523                options,
524                request,
525                file,
526                &query,
527                result,
528                stop_reason,
529                &mut combined,
530                files,
531                file_index,
532                progress_context(file_index, total_files, started),
533            )? {
534                break;
535            }
536        }
537        combined.expand_row_payloads(self.config.reader_options);
538        combined.page_counters = Some(page_window.into_inner().counters());
539        Ok(combined)
540    }
541
542    fn open_file_for_explore(
543        &self,
544        path: &Path,
545        combined: &mut CombinedResult,
546        options: &mut NetdataFunctionRunOptions<'_>,
547        progress: ProgressContext,
548    ) -> Result<Option<FileReader>> {
549        match FileReader::open_with_options(path, self.config.reader_options) {
550            Ok(reader) => Ok(Some(reader)),
551            Err(err) => {
552                combined.skipped_files = combined.skipped_files.saturating_add(1);
553                combined
554                    .file_errors
555                    .push(format!("{}: {err}", path.display()));
556                emit_progress_for_combined(options, combined, progress);
557                Ok(None)
558            }
559        }
560    }
561
562    #[allow(clippy::too_many_arguments)]
563    fn explore_single_file(
564        &self,
565        reader: &mut FileReader,
566        request: &NetdataRequest,
567        query: &ExplorerQuery,
568        deadline: Option<Instant>,
569        options: &mut NetdataFunctionRunOptions<'_>,
570        combined: &CombinedResult,
571        page_window: &RefCell<NetdataPageWindow>,
572        sampling_state: Option<&mut ExplorerSamplingState>,
573        realtime_adjuster: &RefCell<NetdataRealtimeAdjuster>,
574        progress: ProgressContext,
575        realtime_delta_usec: u64,
576    ) -> Result<(ExplorerResult, Option<ExplorerStopReason>)> {
577        let cancellation_callback = options.cancellation_callback;
578        let progress_interval = options.progress_interval;
579        let mut explorer_progress = |explorer_progress: ExplorerProgress| {
580            emit_explorer_progress(options, combined, explorer_progress, progress);
581        };
582        let mut control = ExplorerControl::new();
583        control.set_deadline(deadline);
584        control.set_cancellation_callback(cancellation_callback);
585        control.set_progress_interval(progress_interval);
586        control.set_progress_callback(Some(&mut explorer_progress));
587        control.set_sampling_state(sampling_state);
588        let mut candidate_row =
589            |realtime_usec| page_window.borrow().candidate_to_keep(realtime_usec);
590        control.set_candidate_row_callback(Some(&mut candidate_row));
591        let mut adjust_realtime =
592            |realtime_usec| realtime_adjuster.borrow_mut().adjust(realtime_usec);
593        control.set_realtime_adjust_callback(Some(&mut adjust_realtime));
594        let mut matched_row = |realtime_usec, rows_matched| {
595            delta_scan_can_stop(
596                request,
597                page_window,
598                realtime_usec,
599                rows_matched,
600                realtime_delta_usec,
601            )
602        };
603        control.set_matched_row_callback(Some(&mut matched_row));
604        let result = reader.explore_with_strategy_cursor_rows_controlled(
605            query,
606            self.config.explorer_strategy,
607            &mut control,
608        )?;
609        Ok((result, control.stop_reason()))
610    }
611
612    fn info_response(
613        &self,
614        echo: Value,
615        paths: &[PathBuf],
616        options: &NetdataFunctionRunOptions<'_>,
617    ) -> Value {
618        json!({
619            "_request": echo,
620            "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
621            "v": 3,
622            "accepted_params": self.accepted_params_from_fields(&[]),
623            "required_params": self.required_source_params(paths, options),
624            "show_ids": true,
625            "has_history": true,
626            "pagination": {
627                "enabled": true,
628                "key": "anchor",
629                "column": "timestamp",
630                "units": "timestamp_usec",
631            },
632            "status": 200,
633            "type": "table",
634            "help": "Netdata-compatible journal log function backed by the systemd journal SDK"
635        })
636    }
637
638    fn query_response(
639        &self,
640        request: NetdataRequest,
641        annotation_paths: &[PathBuf],
642        combined: CombinedResult,
643    ) -> Value {
644        if combined.cancelled {
645            return netdata_function_error(499, "Request cancelled.");
646        }
647        let artifacts = self.query_response_artifacts(&request, annotation_paths, &combined);
648        let mut response = base_query_response(&request, &combined, &artifacts);
649        let Some(object) = response.as_object_mut() else {
650            return netdata_function_error(500, "Internal Netdata function response error.");
651        };
652        self.add_query_response_metadata(object, &request, &combined, artifacts);
653        response
654    }
655
656    fn query_response_artifacts(
657        &self,
658        request: &NetdataRequest,
659        annotation_paths: &[PathBuf],
660        combined: &CombinedResult,
661    ) -> QueryResponseArtifacts {
662        let reportable_facet_fields = combined.reportable_facet_fields_bytes(&request.facets);
663        let reportable_facet_field_names = string_fields(&reportable_facet_fields);
664        let columns = self.build_columns(
665            &request,
666            &reportable_facet_fields,
667            &combined.rows,
668            &combined.facets,
669            &combined.column_fields,
670        );
671        let boot_ids = response_boot_ids(
672            &columns.order,
673            &combined.rows,
674            &combined.facets,
675            combined.histogram.as_ref(),
676        );
677        let context = DisplayContext {
678            boot_first_realtime: collect_boot_first_realtime(
679                annotation_paths,
680                self.config.reader_options,
681                &boot_ids,
682            ),
683            ..DisplayContext::default()
684        };
685        let data =
686            self.build_data_rows(&context, &columns.order, &combined.rows, request.direction);
687        let facets = self.build_facets(&context, &reportable_facet_fields, &combined.facets);
688        let histogram = self.query_histogram_artifact(request, combined, &context);
689        let message = query_message(combined.timed_out, &combined.stats);
690        let items = response_items(request, combined, data.len() as u64);
691        QueryResponseArtifacts {
692            reportable_facet_field_names,
693            columns,
694            data,
695            facets,
696            histogram,
697            message,
698            items,
699        }
700    }
701
702    fn add_query_response_metadata(
703        &self,
704        object: &mut Map<String, Value>,
705        request: &NetdataRequest,
706        combined: &CombinedResult,
707        artifacts: QueryResponseArtifacts,
708    ) {
709        if !request.data_only {
710            self.add_full_query_response_metadata(object, request, combined, &artifacts);
711        } else if request.histogram.is_some() {
712            object.insert(
713                "available_histograms".to_string(),
714                self.available_histograms(request, combined),
715            );
716        }
717        add_last_modified_if_needed(object, request, combined);
718        add_sampling_if_needed(object, combined);
719        add_analysis_outputs_if_needed(object, request, artifacts);
720    }
721
722    fn query_histogram_artifact(
723        &self,
724        request: &NetdataRequest,
725        combined: &CombinedResult,
726        context: &DisplayContext,
727    ) -> Option<Value> {
728        if let Some(histogram) = combined.histogram.as_ref() {
729            return Some(self.build_histogram(
730                context,
731                histogram,
732                combined.facets.get(&histogram.field),
733            ));
734        }
735        let histogram_field = request.histogram.as_ref()?;
736        if request.data_only && !request.delta {
737            return None;
738        }
739        let query = request.to_explorer_query(
740            combined.matched_files,
741            None,
742            NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
743        );
744        let histogram = empty_histogram_for_query(histogram_field.as_bytes(), &query);
745        Some(self.build_histogram(
746            context,
747            &histogram,
748            combined.facets.get(histogram.field.as_slice()),
749        ))
750    }
751
752    fn add_full_query_response_metadata(
753        &self,
754        object: &mut Map<String, Value>,
755        request: &NetdataRequest,
756        combined: &CombinedResult,
757        artifacts: &QueryResponseArtifacts,
758    ) {
759        object.insert("message".to_string(), artifacts.message.clone());
760        object.insert("update_every".to_string(), Value::from(1));
761        object.insert("help".to_string(), Value::Null);
762        object.insert(
763            "accepted_params".to_string(),
764            self.accepted_params_from_fields(&artifacts.reportable_facet_field_names),
765        );
766        object.insert("default_sort_column".to_string(), Value::from("timestamp"));
767        object.insert("default_charts".to_string(), Value::Array(Vec::new()));
768        object.insert(
769            "available_histograms".to_string(),
770            self.available_histograms(request, combined),
771        );
772    }
773
774    fn build_columns(
775        &self,
776        request: &NetdataRequest,
777        reportable_facet_fields: &[Vec<u8>],
778        rows: &[LocatedRow],
779        facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
780        column_fields: &BTreeSet<String>,
781    ) -> Columns {
782        let mut order = vec!["timestamp".to_string(), "rowOptions".to_string()];
783        push_unique_many(&mut order, &self.config.default_view_keys);
784        push_unique_many(&mut order, &string_fields(reportable_facet_fields));
785        if let Some(histogram) = &request.histogram {
786            push_unique(&mut order, histogram);
787        }
788        for field in column_fields {
789            push_unique(&mut order, field);
790        }
791
792        for (field, values) in facets {
793            if !facet_group_is_reportable(values) {
794                continue;
795            }
796            push_unique(&mut order, &String::from_utf8_lossy(field));
797        }
798        for row in rows {
799            let fields = row_fields(row);
800            for field in fields.keys() {
801                push_unique(&mut order, field);
802            }
803        }
804
805        let mut map = Map::new();
806        for (index, key) in order.iter().enumerate() {
807            map.insert(key.clone(), column_metadata(key, index));
808        }
809        Columns { order, map }
810    }
811
812    fn build_data_rows(
813        &self,
814        context: &DisplayContext,
815        column_order: &[String],
816        rows: &[LocatedRow],
817        direction: Direction,
818    ) -> Vec<Value> {
819        let row_iter: Box<dyn Iterator<Item = &LocatedRow> + '_> = match direction {
820            Direction::Forward => Box::new(rows.iter().rev()),
821            Direction::Backward => Box::new(rows.iter()),
822        };
823        row_iter
824            .map(|located| {
825                let fields = row_fields(located);
826                let mut row = Vec::with_capacity(column_order.len());
827                for column in column_order {
828                    let value = match column.as_str() {
829                        "timestamp" => Value::from(located.row.realtime_usec),
830                        "rowOptions" => self.profile.row_options(&fields),
831                        field => first_value(&fields, field)
832                            .map(|value| {
833                                self.profile.field_display_value(
834                                    context,
835                                    DisplayScope::Data,
836                                    field,
837                                    value,
838                                )
839                            })
840                            .unwrap_or(Value::Null),
841                    };
842                    row.push(value);
843                }
844                Value::Array(row)
845            })
846            .collect()
847    }
848
849    fn build_facets(
850        &self,
851        context: &DisplayContext,
852        requested: &[Vec<u8>],
853        facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
854    ) -> Value {
855        let mut out = Vec::new();
856        for (order, field) in requested.iter().enumerate() {
857            let values = facets.get(field);
858            let field_name = String::from_utf8_lossy(field).into_owned();
859            let mut options: Vec<_> = values
860                .into_iter()
861                .flat_map(|values| values.iter())
862                .filter(|(value, count)| {
863                    (!value.is_empty() && value.as_slice() != b"-")
864                        || (**count == 0 && value.is_empty())
865                })
866                .map(|(value, count)| {
867                    if *count == 0 && value.is_empty() {
868                        return json!({
869                            "id": NETDATA_EMPTY_STRING_FACET_HASH_ID,
870                            "name": NETDATA_UNAVAILABLE_FIELD_LABEL,
871                            "count": count,
872                        });
873                    }
874                    json!({
875                        "id": String::from_utf8_lossy(value).into_owned(),
876                        "name": self.profile.facet_option_name(context, &field_name, value),
877                        "count": count,
878                    })
879                })
880                .collect();
881            sort_facet_options(&field_name, &mut options);
882            for (idx, option) in options.iter_mut().enumerate() {
883                if let Some(object) = option.as_object_mut() {
884                    object.insert("order".to_string(), Value::from((idx + 1) as u64));
885                }
886            }
887            out.push(json!({
888                "id": field_name,
889                "name": String::from_utf8_lossy(field).into_owned(),
890                "order": order + 1,
891                "options": options,
892            }));
893        }
894        Value::Array(out)
895    }
896
897    fn build_histogram(
898        &self,
899        context: &DisplayContext,
900        histogram: &ExplorerHistogram,
901        known_values: Option<&BTreeMap<Vec<u8>, u64>>,
902    ) -> Value {
903        let field = String::from_utf8_lossy(&histogram.field).into_owned();
904        let mut dimension_ids = BTreeSet::new();
905        let mut buckets = Vec::with_capacity(histogram.buckets.len());
906        for bucket in &histogram.buckets {
907            let mut values = BTreeMap::new();
908            for (value, count) in &bucket.values {
909                add_netdata_facet_count(&mut values, value, *count);
910            }
911            for value in values.keys() {
912                dimension_ids.insert(value.clone());
913            }
914            buckets.push((bucket.start_realtime_usec, values));
915        }
916        let actual_dimension_ids = dimension_ids.clone();
917        if let Some(known_values) = known_values {
918            for value in known_values.keys() {
919                if value.is_empty() || value.as_slice() == b"-" {
920                    continue;
921                }
922                dimension_ids.insert(value.clone());
923            }
924        }
925        let dimension_ids: Vec<Vec<u8>> = dimension_ids.into_iter().collect();
926        let metadata = self.histogram_chart_metadata(
927            context,
928            &field,
929            &buckets,
930            &actual_dimension_ids,
931            &dimension_ids,
932        );
933        let data: Vec<Value> = buckets
934            .iter()
935            .map(|(start_realtime_usec, values)| {
936                let mut point = Vec::with_capacity(dimension_ids.len() + 1);
937                point.push(Value::from(start_realtime_usec / 1000));
938                for value in &dimension_ids {
939                    let count = values
940                        .get(value)
941                        .copied()
942                        .map(Value::from)
943                        .unwrap_or_else(|| {
944                            if actual_dimension_ids.contains(value) {
945                                Value::from(0)
946                            } else {
947                                Value::Null
948                            }
949                        });
950                    point.push(Value::Array(vec![count, Value::from(0), Value::from(0)]));
951                }
952                Value::Array(point)
953            })
954            .collect();
955
956        json!({
957            "id": field,
958            "name": field,
959            "chart": {
960                "summary": metadata.summary,
961                "totals": metadata.totals,
962                "result": {
963                    "labels": metadata.result_labels,
964                    "point": { "value": 0, "arp": 1, "pa": 2 },
965                    "data": data,
966                },
967                "db": {
968                    "tiers": 1,
969                    "update_every": histogram_update_every_seconds(histogram),
970                    "units": "events",
971                    "dimensions": {
972                        "ids": metadata.ids.clone(),
973                        "names": metadata.names.clone(),
974                        "units": metadata.units.clone(),
975                        "sts": metadata.stats.clone(),
976                    },
977                    "per_tier": [{
978                        "tier": 0,
979                        "queries": 1,
980                        "points": metadata.points,
981                        "update_every": histogram_update_every_seconds(histogram),
982                    }],
983                },
984                "view": {
985                    "title": format!("Events Distribution by {}", String::from_utf8_lossy(&histogram.field)),
986                    "update_every": histogram_update_every_seconds(histogram),
987                    "after": histogram_after_seconds(histogram),
988                    "before": histogram_before_seconds(histogram),
989                    "units": "events",
990                    "chart_type": "stackedBar",
991                    "dimensions": {
992                        "grouped_by": ["dimension"],
993                        "ids": metadata.ids,
994                        "names": metadata.names,
995                        "colors": metadata.colors,
996                        "units": metadata.units,
997                        "sts": metadata.stats,
998                    },
999                    "min": metadata.min,
1000                    "max": metadata.max,
1001                },
1002                "agents": [{
1003                    "mg": "default",
1004                    "nm": "facets.histogram",
1005                    "now": now_unix_seconds(),
1006                    "ai": 0,
1007                }],
1008            }
1009        })
1010    }
1011
1012    fn histogram_chart_metadata(
1013        &self,
1014        context: &DisplayContext,
1015        field: &str,
1016        buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1017        actual_dimensions: &BTreeSet<Vec<u8>>,
1018        dimensions: &[Vec<u8>],
1019    ) -> HistogramChartMetadata {
1020        let mut ids = Vec::with_capacity(dimensions.len());
1021        let mut names = Vec::with_capacity(dimensions.len());
1022        let mut colors = Vec::with_capacity(dimensions.len());
1023        let mut units = Vec::with_capacity(dimensions.len());
1024        let mut min_values = Vec::with_capacity(dimensions.len());
1025        let mut max_values = Vec::with_capacity(dimensions.len());
1026        let mut avg_values = Vec::with_capacity(dimensions.len());
1027        let mut arp_values = Vec::with_capacity(dimensions.len());
1028        let mut con_values = Vec::with_capacity(dimensions.len());
1029        let mut summary_dimensions = Vec::with_capacity(dimensions.len());
1030        let mut result_labels = vec![Value::String("time".to_string())];
1031
1032        let total: u64 = dimensions
1033            .iter()
1034            .map(|dimension| {
1035                buckets
1036                    .iter()
1037                    .map(|(_, values)| values.get(dimension).copied().unwrap_or(0))
1038                    .sum::<u64>()
1039            })
1040            .sum();
1041
1042        let mut min = 0;
1043        let mut max = 0;
1044        let mut points = 0;
1045        for (priority, dimension) in dimensions.iter().enumerate() {
1046            let id = String::from_utf8_lossy(dimension).into_owned();
1047            let display = match self.profile.field_display_value(
1048                context,
1049                DisplayScope::Histogram,
1050                field,
1051                dimension,
1052            ) {
1053                Value::String(value) => value,
1054                other => other.to_string(),
1055            };
1056            let (dimension_min, dimension_max, dimension_sum, actual) =
1057                histogram_dimension_stats(buckets, actual_dimensions, dimension);
1058            let dimension_average = if actual && !buckets.is_empty() {
1059                dimension_sum as f64 / buckets.len() as f64
1060            } else {
1061                0.0
1062            };
1063            if actual {
1064                if points == 0 || dimension_min < min {
1065                    min = dimension_min;
1066                }
1067                if dimension_max > max {
1068                    max = dimension_max;
1069                }
1070                points += buckets.len() as u64;
1071            }
1072            let contribution = if total > 0 {
1073                dimension_sum as f64 * 100.0 / total as f64
1074            } else {
1075                0.0
1076            };
1077
1078            ids.push(Value::String(id.clone()));
1079            names.push(Value::String(display.clone()));
1080            colors.push(Value::Null);
1081            units.push(Value::String("events".to_string()));
1082            result_labels.push(Value::String(display.clone()));
1083            min_values.push(Value::from(dimension_min));
1084            max_values.push(Value::from(dimension_max));
1085            avg_values.push(Value::from(dimension_average));
1086            arp_values.push(Value::from(0));
1087            con_values.push(Value::from(contribution));
1088            summary_dimensions.push(json!({
1089                "id": id,
1090                "nm": display,
1091                "ds": { "sl": bool_to_u64(actual), "qr": bool_to_u64(actual) },
1092                "sts": {
1093                    "min": dimension_min,
1094                    "max": dimension_max,
1095                    "avg": dimension_average,
1096                    "con": contribution,
1097                },
1098                "pri": priority as u64,
1099            }));
1100        }
1101
1102        let stats = json!({
1103            "min": min_values,
1104            "max": max_values,
1105            "avg": avg_values,
1106            "arp": arp_values,
1107            "con": con_values,
1108        });
1109        let summary_stats = json!({
1110            "min": min,
1111            "max": max,
1112            "avg": histogram_average(total, points),
1113            "con": 100.0,
1114        });
1115        let dimensions_len = dimensions.len() as u64;
1116
1117        HistogramChartMetadata {
1118            ids,
1119            names,
1120            colors,
1121            units,
1122            stats,
1123            summary: json!({
1124                "nodes": [histogram_summary_node(dimensions_len, points, &summary_stats)],
1125                "contexts": [histogram_summary_context(dimensions_len, points, &summary_stats)],
1126                "instances": [histogram_summary_instance(dimensions_len, points, &summary_stats)],
1127                "dimensions": summary_dimensions,
1128                "labels": [],
1129                "alerts": [],
1130            }),
1131            totals: histogram_totals(dimensions_len),
1132            result_labels,
1133            min,
1134            max,
1135            points,
1136        }
1137    }
1138
1139    fn accepted_params_from_fields(&self, fields: &[String]) -> Value {
1140        NETDATA_ACCEPTED_PARAMS
1141            .iter()
1142            .copied()
1143            .chain(fields.iter().map(String::as_str))
1144            .map(|field| Value::String(field.to_string()))
1145            .collect()
1146    }
1147
1148    fn required_source_params(
1149        &self,
1150        paths: &[PathBuf],
1151        options: &NetdataFunctionRunOptions<'_>,
1152    ) -> Value {
1153        let mut all = JournalSourceSummary::default();
1154        let mut local = JournalSourceSummary::default();
1155        let mut local_namespaces = JournalSourceSummary::default();
1156        let mut local_system = JournalSourceSummary::default();
1157        let mut local_user = JournalSourceSummary::default();
1158        let mut remote = JournalSourceSummary::default();
1159        let mut other = JournalSourceSummary::default();
1160        let mut exact = BTreeMap::<String, JournalSourceSummary>::new();
1161
1162        for path in paths {
1163            let metadata = file_metadata(options, path);
1164            let source_type = metadata
1165                .as_ref()
1166                .and_then(|metadata| metadata.source_type)
1167                .unwrap_or_else(|| journal_file_source_type(path));
1168            all.add_path(path, self.config.reader_options, metadata.as_ref());
1169            if source_type & SOURCE_TYPE_LOCAL_ALL != 0 {
1170                local.add_path(path, self.config.reader_options, metadata.as_ref());
1171            }
1172            if source_type & SOURCE_TYPE_LOCAL_NAMESPACE != 0 {
1173                local_namespaces.add_path(path, self.config.reader_options, metadata.as_ref());
1174            }
1175            if source_type & SOURCE_TYPE_LOCAL_SYSTEM != 0 {
1176                local_system.add_path(path, self.config.reader_options, metadata.as_ref());
1177            }
1178            if source_type & SOURCE_TYPE_LOCAL_USER != 0 {
1179                local_user.add_path(path, self.config.reader_options, metadata.as_ref());
1180            }
1181            if source_type & SOURCE_TYPE_REMOTE_ALL != 0 {
1182                remote.add_path(path, self.config.reader_options, metadata.as_ref());
1183            }
1184            if source_type & SOURCE_TYPE_LOCAL_OTHER != 0 {
1185                other.add_path(path, self.config.reader_options, metadata.as_ref());
1186            }
1187            let source_name = metadata
1188                .as_ref()
1189                .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1190                .or_else(|| journal_file_exact_source_name(path));
1191            if let Some(source_name) = source_name {
1192                exact.entry(source_name).or_default().add_path(
1193                    path,
1194                    self.config.reader_options,
1195                    metadata.as_ref(),
1196                );
1197            }
1198        }
1199
1200        let mut source_options = Vec::new();
1201        push_source_option(&mut source_options, "all", &all);
1202        push_source_option(&mut source_options, "all-local-logs", &local);
1203        push_source_option(
1204            &mut source_options,
1205            "all-local-namespaces",
1206            &local_namespaces,
1207        );
1208        push_source_option(&mut source_options, "all-local-system-logs", &local_system);
1209        push_source_option(&mut source_options, "all-local-user-logs", &local_user);
1210        push_source_option(&mut source_options, "all-remote-systems", &remote);
1211        push_source_option(&mut source_options, "all-uncategorized", &other);
1212        for (name, summary) in exact {
1213            push_source_option(&mut source_options, &name, &summary);
1214        }
1215
1216        json!([{
1217            "id": "__logs_sources",
1218            "name": self.config.source_selector_name.as_str(),
1219            "help": self.config.source_selector_help.as_str(),
1220            "type": "multiselect",
1221            "options": source_options,
1222        }])
1223    }
1224
1225    fn available_histograms(&self, request: &NetdataRequest, combined: &CombinedResult) -> Value {
1226        let mut fields = combined.reportable_facet_fields(&request.facets);
1227        if request.data_only {
1228            if let Some(histogram) = &request.histogram {
1229                push_unique(&mut fields, histogram);
1230            }
1231        }
1232        let mut sorted = fields.clone();
1233        sorted.sort_by(|left, right| netdata_reorder_key(left).cmp(&netdata_reorder_key(right)));
1234        let order_by_field: BTreeMap<String, usize> = sorted
1235            .into_iter()
1236            .enumerate()
1237            .map(|(index, field)| (field, index + 1))
1238            .collect();
1239
1240        fields
1241            .into_iter()
1242            .map(|field| {
1243                let order = order_by_field.get(&field).copied().unwrap_or(0);
1244                json!({
1245                    "id": field,
1246                    "name": field,
1247                    "order": order,
1248                })
1249            })
1250            .collect()
1251    }
1252}
1253
1254struct HistogramChartMetadata {
1255    ids: Vec<Value>,
1256    names: Vec<Value>,
1257    colors: Vec<Value>,
1258    units: Vec<Value>,
1259    stats: Value,
1260    summary: Value,
1261    totals: Value,
1262    result_labels: Vec<Value>,
1263    min: u64,
1264    max: u64,
1265    points: u64,
1266}
1267
1268fn histogram_dimension_stats(
1269    buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1270    actual_dimensions: &BTreeSet<Vec<u8>>,
1271    dimension: &[u8],
1272) -> (u64, u64, u64, bool) {
1273    if !actual_dimensions.contains(dimension) {
1274        return (0, 0, 0, false);
1275    }
1276    let mut min = 0;
1277    let mut max = 0;
1278    let mut sum = 0;
1279    for (idx, (_, values)) in buckets.iter().enumerate() {
1280        let count = values.get(dimension).copied().unwrap_or(0);
1281        if idx == 0 || count < min {
1282            min = count;
1283        }
1284        if count > max {
1285            max = count;
1286        }
1287        sum += count;
1288    }
1289    (min, max, sum, true)
1290}
1291
1292fn histogram_average(total: u64, points: u64) -> f64 {
1293    if points == 0 {
1294        0.0
1295    } else {
1296        total as f64 / points as f64
1297    }
1298}
1299
1300fn histogram_summary_node(dimensions: u64, points: u64, stats: &Value) -> Value {
1301    let mut node = json!({
1302        "mg": "default",
1303        "nm": "facets.histogram",
1304        "ni": 0,
1305        "st": { "ai": 0, "code": 200, "msg": "" },
1306    });
1307    add_histogram_summary_shape(&mut node, dimensions, points, stats, true);
1308    node
1309}
1310
1311fn histogram_summary_context(dimensions: u64, points: u64, stats: &Value) -> Value {
1312    let mut context = json!({ "id": "facets.histogram" });
1313    add_histogram_summary_shape(&mut context, dimensions, points, stats, true);
1314    context
1315}
1316
1317fn histogram_summary_instance(dimensions: u64, points: u64, stats: &Value) -> Value {
1318    let mut instance = json!({ "id": "facets.histogram", "ni": 0 });
1319    add_histogram_summary_shape(&mut instance, dimensions, points, stats, false);
1320    instance
1321}
1322
1323fn add_histogram_summary_shape(
1324    object: &mut Value,
1325    dimensions: u64,
1326    points: u64,
1327    stats: &Value,
1328    include_instances: bool,
1329) {
1330    let Some(map) = object.as_object_mut() else {
1331        return;
1332    };
1333    if dimensions > 0 {
1334        if include_instances {
1335            map.insert("is".to_string(), json!({ "sl": 1, "qr": 1 }));
1336        }
1337        map.insert(
1338            "ds".to_string(),
1339            json!({ "sl": dimensions, "qr": dimensions }),
1340        );
1341    }
1342    if points > 0 {
1343        map.insert("sts".to_string(), stats.clone());
1344    }
1345}
1346
1347fn histogram_totals(dimensions: u64) -> Value {
1348    let mut totals = json!({ "nodes": { "sl": 1, "qr": 1 } });
1349    if dimensions > 0 {
1350        if let Some(map) = totals.as_object_mut() {
1351            map.insert("contexts".to_string(), json!({ "sl": 1, "qr": 1 }));
1352            map.insert("instances".to_string(), json!({ "sl": 1, "qr": 1 }));
1353            map.insert(
1354                "dimensions".to_string(),
1355                json!({ "sl": dimensions, "qr": dimensions }),
1356            );
1357        }
1358    }
1359    totals
1360}
1361
1362fn bool_to_u64(value: bool) -> u64 {
1363    if value { 1 } else { 0 }
1364}
1365
1366fn histogram_after_seconds(histogram: &ExplorerHistogram) -> u64 {
1367    histogram
1368        .buckets
1369        .first()
1370        .map(|bucket| bucket.start_realtime_usec / 1_000_000)
1371        .unwrap_or(0)
1372}
1373
1374fn histogram_before_seconds(histogram: &ExplorerHistogram) -> u64 {
1375    histogram
1376        .buckets
1377        .last()
1378        .map(|bucket| bucket.end_realtime_usec / 1_000_000)
1379        .unwrap_or(0)
1380}
1381
1382fn now_unix_seconds() -> u64 {
1383    SystemTime::now()
1384        .duration_since(UNIX_EPOCH)
1385        .unwrap_or_default()
1386        .as_secs()
1387}
1388
1389#[derive(Debug, Clone)]
1390struct NetdataRequest {
1391    info: bool,
1392    echo: Value,
1393    after_realtime_usec: Option<u64>,
1394    before_realtime_usec: Option<u64>,
1395    if_modified_since_usec: u64,
1396    anchor: ExplorerAnchor,
1397    direction: Direction,
1398    limit: usize,
1399    data_only: bool,
1400    delta: bool,
1401    tail: bool,
1402    sampling: u64,
1403    source_type: u64,
1404    exact_sources: Vec<String>,
1405    filters: Vec<ExplorerFilter>,
1406    facets: Vec<Vec<u8>>,
1407    histogram: Option<String>,
1408    fts_terms: Vec<ExplorerFtsPattern>,
1409    fts_patterns: Vec<Vec<u8>>,
1410    fts_negative_patterns: Vec<Vec<u8>>,
1411}
1412
1413impl NetdataRequest {
1414    fn parse(value: &Value, config: &NetdataFunctionConfig) -> Result<Self> {
1415        let object = value.as_object().ok_or_else(|| {
1416            SdkError::InvalidPath("Netdata function request must be a JSON object".to_string())
1417        })?;
1418        let now_seconds = unix_now_seconds();
1419        let info = get_bool(object, "info").unwrap_or(false);
1420        let after = get_i64(object, "after");
1421        let before = get_i64(object, "before");
1422        let (after_realtime_usec, before_realtime_usec) =
1423            normalize_time_window(now_seconds, after, before);
1424        let direction = request_direction(object);
1425        let if_modified_since_usec = get_u64(object, "if_modified_since").unwrap_or_default();
1426        let data_only = get_bool(object, "data_only").unwrap_or(false);
1427        let delta = request_delta(data_only, object);
1428        let tail = request_tail(data_only, if_modified_since_usec, object);
1429        let sampling = get_u64(object, "sampling").unwrap_or(DEFAULT_ITEMS_SAMPLING);
1430        let (anchor, direction) = request_anchor_and_direction(
1431            object,
1432            tail,
1433            direction,
1434            after_realtime_usec,
1435            before_realtime_usec,
1436        );
1437        let requested_limit = request_limit(object);
1438        let limit = requested_limit.max(2);
1439        let requested_facets = parse_string_array(object.get("facets"));
1440        let facets = request_facets(&requested_facets, config);
1441        let requested_histogram = request_histogram(object);
1442        let histogram = request_histogram_or_default(&requested_histogram, config);
1443        let requested_query = request_query(object);
1444        let (fts_terms, fts_patterns, fts_negative_patterns) = requested_query
1445            .as_deref()
1446            .map(parse_fts_query_patterns)
1447            .unwrap_or_default();
1448        let source_selection = parse_source_selection(object.get("selections"));
1449        let filters = parse_filters(object.get("selections"));
1450
1451        let echo_input = RequestEchoInput {
1452            info,
1453            after_realtime_usec,
1454            before_realtime_usec,
1455            if_modified_since_usec,
1456            anchor,
1457            direction,
1458            limit: requested_limit,
1459            data_only,
1460            delta,
1461            tail,
1462            sampling,
1463            source_type: source_selection.source_type,
1464            requested_facets: requested_facets.as_deref(),
1465            selections: object.get("selections"),
1466            histogram: requested_histogram.as_deref(),
1467            query: requested_query.as_deref(),
1468        };
1469        let echo = normalized_request_echo(&echo_input);
1470
1471        Ok(Self {
1472            info,
1473            echo,
1474            after_realtime_usec,
1475            before_realtime_usec,
1476            if_modified_since_usec,
1477            anchor,
1478            direction,
1479            limit,
1480            data_only,
1481            delta,
1482            tail,
1483            sampling,
1484            source_type: source_selection.source_type,
1485            exact_sources: source_selection.exact_sources,
1486            filters,
1487            facets,
1488            histogram,
1489            fts_terms,
1490            fts_patterns,
1491            fts_negative_patterns,
1492        })
1493    }
1494
1495    fn matches_source(&self, path: &Path, metadata: Option<&NetdataJournalFileMetadata>) -> bool {
1496        if self.source_type == SOURCE_TYPE_ALL && self.exact_sources.is_empty() {
1497            return true;
1498        }
1499        if self.source_type & SOURCE_TYPE_ALL != 0 {
1500            return true;
1501        }
1502        let file_source_type = metadata
1503            .and_then(|metadata| metadata.source_type)
1504            .unwrap_or_else(|| journal_file_source_type(path));
1505        if file_source_type & self.source_type != 0 {
1506            return true;
1507        }
1508        if self.exact_sources.is_empty() {
1509            return false;
1510        }
1511        let source_name = metadata
1512            .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1513            .or_else(|| journal_file_exact_source_name(path));
1514        self.exact_sources
1515            .iter()
1516            .any(|source| source_name.as_deref() == Some(source.as_str()))
1517    }
1518
1519    fn to_explorer_query(
1520        &self,
1521        matched_files: u64,
1522        file_header: Option<FileHeader>,
1523        realtime_slack_usec: u64,
1524    ) -> ExplorerQuery {
1525        let analysis_enabled = !self.data_only || self.delta;
1526        let tail_anchor = self.tail && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1527        let backward_page_anchor = self.data_only
1528            && !tail_anchor
1529            && self.direction == Direction::Backward
1530            && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1531        let after_realtime_usec = if tail_anchor {
1532            tail_after_realtime_bound(self.after_realtime_usec, self.anchor)
1533        } else {
1534            self.after_realtime_usec
1535        };
1536        let before_realtime_usec = if backward_page_anchor {
1537            before_realtime_bound_excluding_anchor(self.before_realtime_usec, self.anchor)
1538        } else {
1539            self.before_realtime_usec
1540        };
1541        let anchor = if tail_anchor || backward_page_anchor {
1542            ExplorerAnchor::Auto
1543        } else {
1544            self.anchor
1545        };
1546        let sampling = (analysis_enabled
1547            && self.sampling != 0
1548            && matched_files != 0
1549            && after_realtime_usec.is_some()
1550            && before_realtime_usec.is_some())
1551        .then(|| {
1552            let header = file_header.unwrap_or(FileHeader {
1553                signature: [0; 8],
1554                compatible_flags: 0,
1555                incompatible_flags: 0,
1556                state: 0,
1557                header_size: 0,
1558                n_entries: 0,
1559                head_entry_realtime: 0,
1560                tail_entry_realtime: 0,
1561                head_entry_seqnum: 0,
1562                tail_entry_seqnum: 0,
1563                tail_entry_boot_id: [0; 16],
1564                seqnum_id: [0; 16],
1565            });
1566            let messages_in_file = header
1567                .tail_entry_seqnum
1568                .checked_sub(header.head_entry_seqnum)
1569                .and_then(|span| span.checked_add(1))
1570                .filter(|_| header.head_entry_seqnum != 0 && header.tail_entry_seqnum != 0)
1571                .unwrap_or(header.n_entries);
1572            ExplorerSampling {
1573                budget: self.sampling,
1574                matched_files,
1575                file_head_realtime_usec: header.head_entry_realtime,
1576                file_tail_realtime_usec: header.tail_entry_realtime,
1577                file_head_seqnum: header.head_entry_seqnum,
1578                file_tail_seqnum: header.tail_entry_seqnum,
1579                file_entries: messages_in_file,
1580            }
1581        });
1582        ExplorerQuery {
1583            after_realtime_usec,
1584            before_realtime_usec,
1585            anchor,
1586            direction: self.direction,
1587            limit: self.limit,
1588            filters: self.filters.clone(),
1589            facets: analysis_enabled
1590                .then(|| self.facets.clone())
1591                .unwrap_or_default(),
1592            histogram: analysis_enabled
1593                .then(|| {
1594                    self.histogram
1595                        .as_ref()
1596                        .map(|field| field.as_bytes().to_vec())
1597                })
1598                .flatten(),
1599            histogram_after_realtime_usec: self.after_realtime_usec,
1600            histogram_before_realtime_usec: self.before_realtime_usec,
1601            histogram_target_buckets: DEFAULT_HISTOGRAM_BUCKETS,
1602            fts_terms: self.fts_terms.clone(),
1603            fts_patterns: self.fts_patterns.clone(),
1604            fts_negative_patterns: self.fts_negative_patterns.clone(),
1605            field_mode: ExplorerFieldMode::FirstValue,
1606            exclude_facet_field_filters: self.distinct_filter_fields() > 1,
1607            use_source_realtime: true,
1608            realtime_slack_usec: normalize_journal_vs_realtime_delta_usec(realtime_slack_usec),
1609            stop_when_rows_full: self.data_only && !tail_anchor,
1610            stop_when_rows_full_check_every: DATA_ONLY_CHECK_EVERY_ROWS,
1611            sampling,
1612            debug_collect_column_fields_by_row_traversal: false,
1613        }
1614    }
1615
1616    fn file_query(
1617        &self,
1618        matched_files: usize,
1619        file_header: FileHeader,
1620        order: &JournalFileOrderInfo,
1621    ) -> ExplorerQuery {
1622        let mut query = self.to_explorer_query(
1623            matched_files as u64,
1624            Some(file_header),
1625            order.journal_vs_realtime_delta_usec,
1626        );
1627        if self.data_only && self.delta {
1628            query.stop_when_rows_full = false;
1629        }
1630        query
1631    }
1632
1633    fn unfiltered_vocabulary(&self) -> Self {
1634        let mut request = self.clone();
1635        request.filters.clear();
1636        request.histogram = None;
1637        request.limit = 0;
1638        request.fts_terms.clear();
1639        request.fts_patterns.clear();
1640        request.fts_negative_patterns.clear();
1641        request
1642    }
1643
1644    fn distinct_filter_fields(&self) -> usize {
1645        self.filters
1646            .iter()
1647            .map(|filter| filter.field.as_slice())
1648            .collect::<HashSet<_>>()
1649            .len()
1650    }
1651}
1652
1653#[derive(Debug, Clone)]
1654struct LocatedRow {
1655    file_path: PathBuf,
1656    row: ExplorerRow,
1657}
1658
1659#[derive(Debug)]
1660struct NetdataRealtimeAdjuster {
1661    direction: Direction,
1662    last_realtime_from: u64,
1663    last_realtime_to: u64,
1664}
1665
1666impl NetdataRealtimeAdjuster {
1667    fn new(direction: Direction) -> Self {
1668        Self {
1669            direction,
1670            last_realtime_from: 0,
1671            last_realtime_to: 0,
1672        }
1673    }
1674
1675    fn adjust(&mut self, realtime_usec: u64) -> u64 {
1676        match self.direction {
1677            Direction::Backward => {
1678                if realtime_usec >= self.last_realtime_from
1679                    && realtime_usec <= self.last_realtime_to
1680                {
1681                    self.last_realtime_from = self.last_realtime_from.saturating_sub(1);
1682                    self.last_realtime_from
1683                } else {
1684                    self.last_realtime_from = realtime_usec;
1685                    self.last_realtime_to = realtime_usec;
1686                    realtime_usec
1687                }
1688            }
1689            Direction::Forward => {
1690                if realtime_usec >= self.last_realtime_from
1691                    && realtime_usec <= self.last_realtime_to
1692                {
1693                    self.last_realtime_to = self.last_realtime_to.saturating_add(1);
1694                    self.last_realtime_to
1695                } else {
1696                    self.last_realtime_from = realtime_usec;
1697                    self.last_realtime_to = realtime_usec;
1698                    realtime_usec
1699                }
1700            }
1701        }
1702    }
1703}
1704
1705#[derive(Debug, Default)]
1706struct JournalSourceSummary {
1707    files: u64,
1708    total_size: u64,
1709    first_realtime_usec: Option<u64>,
1710    last_realtime_usec: Option<u64>,
1711}
1712
1713impl JournalSourceSummary {
1714    #[cfg(test)]
1715    fn from_paths(
1716        paths: &[PathBuf],
1717        reader_options: ReaderOptions,
1718        options: &NetdataFunctionRunOptions<'_>,
1719    ) -> Self {
1720        let mut summary = Self::default();
1721        for path in paths {
1722            let metadata = file_metadata(options, path);
1723            summary.add_path(path, reader_options, metadata.as_ref());
1724        }
1725        summary
1726    }
1727
1728    fn add_path(
1729        &mut self,
1730        path: &Path,
1731        reader_options: ReaderOptions,
1732        metadata: Option<&NetdataJournalFileMetadata>,
1733    ) {
1734        if let Ok(metadata) = std::fs::metadata(path) {
1735            self.files = self.files.saturating_add(1);
1736            self.total_size = self.total_size.saturating_add(metadata.len());
1737        }
1738        if let Some(metadata) = metadata {
1739            let metadata_first = metadata.msg_first_realtime_usec;
1740            let metadata_last = metadata.msg_last_realtime_usec;
1741            if let Some(first) = metadata_first {
1742                self.first_realtime_usec = Some(
1743                    self.first_realtime_usec
1744                        .map_or(first, |current| current.min(first)),
1745                );
1746            }
1747            if let Some(last) = metadata_last {
1748                self.last_realtime_usec = Some(
1749                    self.last_realtime_usec
1750                        .map_or(last, |current| current.max(last)),
1751                );
1752            }
1753            if metadata_first.is_some() && metadata_last.is_some() {
1754                return;
1755            }
1756        }
1757        let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
1758            return;
1759        };
1760        let header = reader.header();
1761        if header.head_entry_realtime != 0 {
1762            self.first_realtime_usec = Some(
1763                self.first_realtime_usec
1764                    .map_or(header.head_entry_realtime, |current| {
1765                        current.min(header.head_entry_realtime)
1766                    }),
1767            );
1768        }
1769        if header.tail_entry_realtime != 0 {
1770            self.last_realtime_usec = Some(
1771                self.last_realtime_usec
1772                    .map_or(header.tail_entry_realtime, |current| {
1773                        current.max(header.tail_entry_realtime)
1774                    }),
1775            );
1776        }
1777    }
1778
1779    fn info(&self) -> String {
1780        let coverage = match (self.first_realtime_usec, self.last_realtime_usec) {
1781            (Some(first), Some(last)) if last > first && (last - first) >= 1_000_000 => {
1782                human_duration_seconds((last - first) / 1_000_000)
1783            }
1784            _ => "off".to_string(),
1785        };
1786        let last_entry = self
1787            .last_realtime_usec
1788            .and_then(|usec| DateTime::<Utc>::from_timestamp((usec / 1_000_000) as i64, 0))
1789            .map(|datetime| datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1790            .unwrap_or_else(|| "unknown".to_string());
1791        format!(
1792            "{} files, total size {}, covering {}, last entry at {}",
1793            self.files,
1794            human_binary_size(self.total_size),
1795            coverage,
1796            last_entry
1797        )
1798    }
1799}
1800
1801fn push_source_option(target: &mut Vec<Value>, id: &str, summary: &JournalSourceSummary) {
1802    if summary.files == 0 {
1803        return;
1804    }
1805    target.push(json!({
1806        "id": id,
1807        "name": id,
1808        "info": summary.info(),
1809        "pill": human_binary_size(summary.total_size),
1810    }));
1811}
1812
1813fn expand_located_row_payloads(
1814    located: &mut LocatedRow,
1815    reader_options: ReaderOptions,
1816) -> Result<()> {
1817    let mut reader = FileReader::open_with_options(&located.file_path, reader_options)?;
1818    reader.seek_cursor(&located.row.cursor)?;
1819    if !reader.test_cursor(&located.row.cursor)? {
1820        return Err(SdkError::InvalidCursor(format!(
1821            "selected row cursor is no longer available: {}",
1822            located.row.cursor
1823        )));
1824    }
1825    reader.collect_entry_payloads(&mut located.row.payloads)
1826}
1827
1828#[derive(Debug, Default)]
1829struct CombinedResult {
1830    rows: Vec<LocatedRow>,
1831    facets: BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1832    histogram: Option<ExplorerHistogram>,
1833    column_fields: BTreeSet<String>,
1834    stats: ExplorerStats,
1835    page_counters: Option<NetdataPageCounters>,
1836    matched_files: u64,
1837    matched_paths: Vec<PathBuf>,
1838    skipped_files: u64,
1839    file_errors: Vec<String>,
1840    partial: bool,
1841    timed_out: bool,
1842    cancelled: bool,
1843    sampling_enabled: bool,
1844}
1845
1846#[derive(Clone, Copy, Debug, Default)]
1847struct NetdataPageCounters {
1848    matched: u64,
1849    before: u64,
1850    after: u64,
1851}
1852
1853#[derive(Clone, Copy)]
1854struct ProgressContext {
1855    current_file: usize,
1856    total_files: usize,
1857    started: Instant,
1858}
1859
1860#[derive(Debug)]
1861enum NetdataPageHeap {
1862    Backward(BinaryHeap<Reverse<u64>>),
1863    Forward(BinaryHeap<u64>),
1864}
1865
1866#[derive(Debug)]
1867struct NetdataPageWindow {
1868    direction: Direction,
1869    anchor_start_usec: Option<u64>,
1870    anchor_stop_usec: Option<u64>,
1871    limit: usize,
1872    heap: NetdataPageHeap,
1873    oldest_retained_usec: Option<u64>,
1874    newest_retained_usec: Option<u64>,
1875    matched: u64,
1876    skips_before: u64,
1877    skips_after: u64,
1878    shifts: u64,
1879}
1880
1881impl NetdataPageWindow {
1882    fn for_request(request: &NetdataRequest) -> Self {
1883        let (anchor_start_usec, anchor_stop_usec) = match (request.tail, request.anchor) {
1884            (true, ExplorerAnchor::Realtime(anchor)) => (None, Some(anchor)),
1885            (false, ExplorerAnchor::Realtime(anchor)) => (Some(anchor), None),
1886            _ => (None, None),
1887        };
1888        let heap = match request.direction {
1889            Direction::Backward => NetdataPageHeap::Backward(BinaryHeap::new()),
1890            Direction::Forward => NetdataPageHeap::Forward(BinaryHeap::new()),
1891        };
1892        Self {
1893            direction: request.direction,
1894            anchor_start_usec,
1895            anchor_stop_usec,
1896            limit: request.limit,
1897            heap,
1898            oldest_retained_usec: None,
1899            newest_retained_usec: None,
1900            matched: 0,
1901            skips_before: 0,
1902            skips_after: if request.tail
1903                && request.delta
1904                && matches!(request.anchor, ExplorerAnchor::Realtime(_))
1905            {
1906                1
1907            } else {
1908                0
1909            },
1910            shifts: 0,
1911        }
1912    }
1913
1914    fn candidate_to_keep(&self, realtime_usec: u64) -> bool {
1915        if self.limit == 0 || !self.entry_within_anchor_readonly(realtime_usec) {
1916            return false;
1917        }
1918        if self.retained_len() < self.limit {
1919            return true;
1920        }
1921        self.oldest_retained_usec
1922            .zip(self.newest_retained_usec)
1923            .is_some_and(|(oldest, newest)| realtime_usec >= oldest && realtime_usec <= newest)
1924    }
1925
1926    fn observe(&mut self, realtime_usec: u64) {
1927        if !self.entry_within_anchor(realtime_usec) || self.limit == 0 {
1928            return;
1929        }
1930        self.matched = self.matched.saturating_add(1);
1931        match (&mut self.heap, self.direction) {
1932            (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1933                if heap.len() < self.limit {
1934                    heap.push(Reverse(realtime_usec));
1935                    self.add_retained_bound(realtime_usec);
1936                    return;
1937                }
1938                let Some(Reverse(oldest)) = heap.peek().copied() else {
1939                    heap.push(Reverse(realtime_usec));
1940                    self.refresh_retained_bounds();
1941                    return;
1942                };
1943                if realtime_usec < oldest {
1944                    self.skips_after = self.skips_after.saturating_add(1);
1945                    return;
1946                }
1947                heap.pop();
1948                heap.push(Reverse(realtime_usec));
1949                self.refresh_retained_bounds();
1950                self.shifts = self.shifts.saturating_add(1);
1951            }
1952            (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1953                if heap.len() < self.limit {
1954                    heap.push(realtime_usec);
1955                    self.add_retained_bound(realtime_usec);
1956                    return;
1957                }
1958                let Some(newest) = heap.peek().copied() else {
1959                    heap.push(realtime_usec);
1960                    self.refresh_retained_bounds();
1961                    return;
1962                };
1963                if realtime_usec > newest {
1964                    self.skips_before = self.skips_before.saturating_add(1);
1965                    return;
1966                }
1967                heap.pop();
1968                heap.push(realtime_usec);
1969                self.refresh_retained_bounds();
1970                self.shifts = self.shifts.saturating_add(1);
1971            }
1972            _ => {}
1973        }
1974    }
1975
1976    fn retained_len(&self) -> usize {
1977        match &self.heap {
1978            NetdataPageHeap::Backward(heap) => heap.len(),
1979            NetdataPageHeap::Forward(heap) => heap.len(),
1980        }
1981    }
1982
1983    fn add_retained_bound(&mut self, realtime_usec: u64) {
1984        self.oldest_retained_usec = Some(
1985            self.oldest_retained_usec
1986                .map_or(realtime_usec, |current| current.min(realtime_usec)),
1987        );
1988        self.newest_retained_usec = Some(
1989            self.newest_retained_usec
1990                .map_or(realtime_usec, |current| current.max(realtime_usec)),
1991        );
1992    }
1993
1994    fn refresh_retained_bounds(&mut self) {
1995        let (oldest, newest) = match &self.heap {
1996            NetdataPageHeap::Backward(heap) => heap
1997                .iter()
1998                .map(|Reverse(usec)| *usec)
1999                .fold((None, None), retained_bounds_fold),
2000            NetdataPageHeap::Forward(heap) => heap
2001                .iter()
2002                .copied()
2003                .fold((None, None), retained_bounds_fold),
2004        };
2005        self.oldest_retained_usec = oldest;
2006        self.newest_retained_usec = newest;
2007    }
2008
2009    fn entry_within_anchor(&mut self, realtime_usec: u64) -> bool {
2010        match self.direction {
2011            Direction::Backward => {
2012                if self
2013                    .anchor_start_usec
2014                    .is_some_and(|anchor| realtime_usec >= anchor)
2015                {
2016                    self.skips_before = self.skips_before.saturating_add(1);
2017                    return false;
2018                }
2019                if self
2020                    .anchor_stop_usec
2021                    .is_some_and(|anchor| realtime_usec <= anchor)
2022                {
2023                    self.skips_after = self.skips_after.saturating_add(1);
2024                    return false;
2025                }
2026            }
2027            Direction::Forward => {
2028                if self
2029                    .anchor_start_usec
2030                    .is_some_and(|anchor| realtime_usec <= anchor)
2031                {
2032                    self.skips_after = self.skips_after.saturating_add(1);
2033                    return false;
2034                }
2035                if self
2036                    .anchor_stop_usec
2037                    .is_some_and(|anchor| realtime_usec >= anchor)
2038                {
2039                    self.skips_before = self.skips_before.saturating_add(1);
2040                    return false;
2041                }
2042            }
2043        }
2044        true
2045    }
2046
2047    fn entry_within_anchor_readonly(&self, realtime_usec: u64) -> bool {
2048        match self.direction {
2049            Direction::Backward => {
2050                if self
2051                    .anchor_start_usec
2052                    .is_some_and(|anchor| realtime_usec >= anchor)
2053                {
2054                    return false;
2055                }
2056                if self
2057                    .anchor_stop_usec
2058                    .is_some_and(|anchor| realtime_usec <= anchor)
2059                {
2060                    return false;
2061                }
2062            }
2063            Direction::Forward => {
2064                if self
2065                    .anchor_start_usec
2066                    .is_some_and(|anchor| realtime_usec <= anchor)
2067                {
2068                    return false;
2069                }
2070                if self
2071                    .anchor_stop_usec
2072                    .is_some_and(|anchor| realtime_usec >= anchor)
2073                {
2074                    return false;
2075                }
2076            }
2077        }
2078        true
2079    }
2080
2081    fn counters(&self) -> NetdataPageCounters {
2082        NetdataPageCounters {
2083            matched: self.matched,
2084            before: self.skips_before,
2085            after: self.skips_after.saturating_add(self.shifts),
2086        }
2087    }
2088
2089    fn can_stop_delta_file(&self, realtime_usec: u64, slack_usec: u64) -> bool {
2090        if self.limit == 0 {
2091            return false;
2092        }
2093        match (&self.heap, self.direction) {
2094            (NetdataPageHeap::Backward(heap), Direction::Backward) => {
2095                if heap.len() < self.limit {
2096                    return false;
2097                }
2098                heap.peek().is_some_and(|Reverse(oldest)| {
2099                    realtime_usec < oldest.saturating_sub(slack_usec)
2100                })
2101            }
2102            (NetdataPageHeap::Forward(heap), Direction::Forward) => {
2103                if heap.len() < self.limit {
2104                    return false;
2105                }
2106                heap.peek()
2107                    .is_some_and(|newest| realtime_usec > newest.saturating_add(slack_usec))
2108            }
2109            _ => false,
2110        }
2111    }
2112}
2113
2114fn retained_bounds_fold(
2115    (oldest, newest): (Option<u64>, Option<u64>),
2116    realtime_usec: u64,
2117) -> (Option<u64>, Option<u64>) {
2118    (
2119        Some(oldest.map_or(realtime_usec, |current| current.min(realtime_usec))),
2120        Some(newest.map_or(realtime_usec, |current| current.max(realtime_usec))),
2121    )
2122}
2123
2124impl CombinedResult {
2125    fn merge(
2126        &mut self,
2127        path: &Path,
2128        result: ExplorerResult,
2129        direction: Direction,
2130        limit: usize,
2131    ) -> Result<()> {
2132        let ExplorerResult {
2133            rows,
2134            facets,
2135            histogram,
2136            stats,
2137            column_fields,
2138            ..
2139        } = result;
2140
2141        if let Some(histogram) = histogram {
2142            merge_histogram(&mut self.histogram, histogram)?;
2143        }
2144
2145        self.merge_stats(stats);
2146        for row in rows {
2147            self.rows.push(LocatedRow {
2148                file_path: path.to_path_buf(),
2149                row,
2150            });
2151        }
2152        for field in column_fields {
2153            if let Ok(field) = String::from_utf8(field) {
2154                self.column_fields.insert(field);
2155            }
2156        }
2157        for (field, values) in facets {
2158            let target = self.facets.entry(field).or_default();
2159            for (value, count) in values {
2160                add_netdata_facet_count(target, &value, count);
2161            }
2162        }
2163        self.sort_and_limit(direction, limit);
2164        Ok(())
2165    }
2166
2167    fn add_column_fields<I>(&mut self, fields: I)
2168    where
2169        I: IntoIterator<Item = String>,
2170    {
2171        self.column_fields.extend(fields);
2172    }
2173
2174    fn sort_and_limit(&mut self, direction: Direction, limit: usize) {
2175        match direction {
2176            Direction::Forward => self.rows.sort_by_key(|row| row.row.realtime_usec),
2177            Direction::Backward => self
2178                .rows
2179                .sort_by(|left, right| right.row.realtime_usec.cmp(&left.row.realtime_usec)),
2180        }
2181        make_row_timestamps_unique(&mut self.rows, direction);
2182        if self.rows.len() > limit {
2183            self.rows.truncate(limit);
2184        }
2185        self.stats.rows_returned = self.rows.len() as u64;
2186    }
2187
2188    fn expand_row_payloads(&mut self, reader_options: ReaderOptions) {
2189        if self.rows.is_empty() {
2190            self.stats.rows_returned = 0;
2191            return;
2192        }
2193
2194        let mut rows = Vec::with_capacity(self.rows.len());
2195        for mut located in self.rows.drain(..) {
2196            if !located.row.payloads.is_empty() {
2197                rows.push(located);
2198                continue;
2199            }
2200            match expand_located_row_payloads(&mut located, reader_options) {
2201                Ok(()) => {
2202                    self.stats.returned_row_expansions =
2203                        self.stats.returned_row_expansions.saturating_add(1);
2204                    rows.push(located);
2205                }
2206                Err(err) => {
2207                    self.partial = true;
2208                    self.file_errors.push(format!(
2209                        "{} cursor {}: {err}",
2210                        located.file_path.display(),
2211                        located.row.cursor
2212                    ));
2213                }
2214            }
2215        }
2216        self.rows = rows;
2217        self.stats.rows_returned = self.rows.len() as u64;
2218    }
2219
2220    fn merge_stats(&mut self, stats: ExplorerStats) {
2221        self.stats.rows_examined = self.stats.rows_examined.saturating_add(stats.rows_examined);
2222        self.stats.rows_matched = self.stats.rows_matched.saturating_add(stats.rows_matched);
2223        self.stats.facet_rows_matched = self
2224            .stats
2225            .facet_rows_matched
2226            .saturating_add(stats.facet_rows_matched);
2227        self.stats.rows_returned = self.stats.rows_returned.saturating_add(stats.rows_returned);
2228        self.stats.rows_unsampled = self
2229            .stats
2230            .rows_unsampled
2231            .saturating_add(stats.rows_unsampled);
2232        self.stats.rows_estimated = self
2233            .stats
2234            .rows_estimated
2235            .saturating_add(stats.rows_estimated);
2236        self.stats.sampling_sampled = self
2237            .stats
2238            .sampling_sampled
2239            .saturating_add(stats.sampling_sampled);
2240        self.stats.sampling_unsampled = self
2241            .stats
2242            .sampling_unsampled
2243            .saturating_add(stats.sampling_unsampled);
2244        self.stats.sampling_estimated = self
2245            .stats
2246            .sampling_estimated
2247            .saturating_add(stats.sampling_estimated);
2248        if stats.last_realtime_usec > self.stats.last_realtime_usec {
2249            self.stats.last_realtime_usec = stats.last_realtime_usec;
2250        }
2251        if stats.max_source_realtime_delta_usec > self.stats.max_source_realtime_delta_usec {
2252            self.stats.max_source_realtime_delta_usec = stats.max_source_realtime_delta_usec;
2253        }
2254        self.stats.data_refs_seen = self
2255            .stats
2256            .data_refs_seen
2257            .saturating_add(stats.data_refs_seen);
2258        self.stats.data_refs_skipped = self
2259            .stats
2260            .data_refs_skipped
2261            .saturating_add(stats.data_refs_skipped);
2262        self.stats.data_payloads_loaded = self
2263            .stats
2264            .data_payloads_loaded
2265            .saturating_add(stats.data_payloads_loaded);
2266        self.stats.data_objects_classified = self
2267            .stats
2268            .data_objects_classified
2269            .saturating_add(stats.data_objects_classified);
2270        self.stats.data_cache_hits = self
2271            .stats
2272            .data_cache_hits
2273            .saturating_add(stats.data_cache_hits);
2274        self.stats.data_cache_misses = self
2275            .stats
2276            .data_cache_misses
2277            .saturating_add(stats.data_cache_misses);
2278        self.stats.payloads_decompressed = self
2279            .stats
2280            .payloads_decompressed
2281            .saturating_add(stats.payloads_decompressed);
2282        self.stats.fts_scans = self.stats.fts_scans.saturating_add(stats.fts_scans);
2283        self.stats.facet_updates = self.stats.facet_updates.saturating_add(stats.facet_updates);
2284        self.stats.histogram_updates = self
2285            .stats
2286            .histogram_updates
2287            .saturating_add(stats.histogram_updates);
2288        self.stats.returned_row_expansions = self
2289            .stats
2290            .returned_row_expansions
2291            .saturating_add(stats.returned_row_expansions);
2292        self.stats.early_stop_opportunities = self
2293            .stats
2294            .early_stop_opportunities
2295            .saturating_add(stats.early_stop_opportunities);
2296        self.stats.early_stops = self.stats.early_stops.saturating_add(stats.early_stops);
2297    }
2298
2299    fn add_zero_count_facet_values(
2300        &mut self,
2301        vocabulary: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
2302    ) {
2303        for (field, values) in vocabulary {
2304            let target = self.facets.entry(field.clone()).or_default();
2305            for value in values.keys() {
2306                add_netdata_facet_count(target, value, 0);
2307            }
2308        }
2309    }
2310
2311    fn add_zero_count_facet_values_from_files(
2312        &mut self,
2313        fields: &[Vec<u8>],
2314        reader_options: ReaderOptions,
2315    ) {
2316        for path in &self.matched_paths {
2317            let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2318                continue;
2319            };
2320            for field in fields {
2321                let Ok(field_name) = std::str::from_utf8(field) else {
2322                    continue;
2323                };
2324                let Ok(values) = reader.query_unique(field_name) else {
2325                    continue;
2326                };
2327                if values.is_empty() {
2328                    continue;
2329                }
2330                let target = self.facets.entry(field.clone()).or_default();
2331                for value in values {
2332                    add_netdata_facet_count(target, &value, 0);
2333                }
2334            }
2335        }
2336    }
2337
2338    fn add_zero_count_selected_filter_values(&mut self, request: &NetdataRequest) {
2339        let mut report_fields: HashSet<Vec<u8>> = request.facets.iter().cloned().collect();
2340        if let Some(histogram) = &request.histogram {
2341            report_fields.insert(histogram.as_bytes().to_vec());
2342        }
2343        for filter in &request.filters {
2344            if !report_fields.contains(&filter.field) {
2345                continue;
2346            }
2347            let target = self.facets.entry(filter.field.clone()).or_default();
2348            for value in &filter.values {
2349                add_netdata_facet_count(target, value, 0);
2350            }
2351        }
2352    }
2353
2354    fn reportable_facet_fields(&self, requested: &[Vec<u8>]) -> Vec<String> {
2355        string_fields(&self.reportable_facet_fields_bytes(requested))
2356    }
2357
2358    fn reportable_facet_fields_bytes(&self, requested: &[Vec<u8>]) -> Vec<Vec<u8>> {
2359        let mut fields = Vec::new();
2360        for field in requested {
2361            if !fields.contains(field) {
2362                fields.push(field.clone());
2363            }
2364        }
2365        fields
2366    }
2367}
2368
2369fn netdata_function_error(status: u64, message: &str) -> Value {
2370    json!({
2371        "status": status,
2372        "errorMessage": message,
2373    })
2374}
2375
2376fn query_message(timed_out: bool, stats: &ExplorerStats) -> Value {
2377    if !timed_out && stats.rows_unsampled == 0 && stats.rows_estimated == 0 {
2378        return Value::String("OK".to_string());
2379    }
2380
2381    let total = stats
2382        .rows_examined
2383        .saturating_add(stats.rows_unsampled)
2384        .saturating_add(stats.rows_estimated)
2385        .max(1);
2386    let real_percent = stats.rows_examined as f64 * 100.0 / total as f64;
2387    let unsampled_percent = stats.rows_unsampled as f64 * 100.0 / total as f64;
2388    let estimated_percent = stats.rows_estimated as f64 * 100.0 / total as f64;
2389
2390    let mut title = String::new();
2391    let mut description = String::new();
2392    let mut status = "notice";
2393    if timed_out {
2394        title.push_str("Query timed-out, incomplete data. ");
2395        description.push_str(
2396            "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ",
2397        );
2398        status = "warning";
2399    }
2400    if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2401        title.push_str(&format!("{real_percent:.2}% real data"));
2402        description.push_str(&format!(
2403            "ACTUAL DATA: The filters counters reflect {real_percent:.2}% of the data. "
2404        ));
2405    }
2406    if stats.rows_unsampled != 0 {
2407        title.push_str(&format!(", {unsampled_percent:.2}% unsampled"));
2408        description.push_str(&format!(
2409            "UNSAMPLED DATA: {unsampled_percent:.2}% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
2410        ));
2411    }
2412    if stats.rows_estimated != 0 {
2413        title.push_str(&format!(", {estimated_percent:.2}% estimated"));
2414        description.push_str(&format!(
2415            "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by {estimated_percent:.2}%. "
2416        ));
2417    }
2418
2419    json!({
2420        "title": title,
2421        "status": status,
2422        "description": description,
2423    })
2424}
2425
2426fn merged_progress_stats(completed: &ExplorerStats, current: &ExplorerStats) -> ExplorerStats {
2427    let mut stats = completed.clone();
2428    stats.rows_examined = stats.rows_examined.saturating_add(current.rows_examined);
2429    stats.rows_matched = stats.rows_matched.saturating_add(current.rows_matched);
2430    stats.facet_rows_matched = stats
2431        .facet_rows_matched
2432        .saturating_add(current.facet_rows_matched);
2433    stats.rows_returned = stats.rows_returned.saturating_add(current.rows_returned);
2434    stats.rows_unsampled = stats.rows_unsampled.saturating_add(current.rows_unsampled);
2435    stats.rows_estimated = stats.rows_estimated.saturating_add(current.rows_estimated);
2436    stats.sampling_sampled = stats
2437        .sampling_sampled
2438        .saturating_add(current.sampling_sampled);
2439    stats.sampling_unsampled = stats
2440        .sampling_unsampled
2441        .saturating_add(current.sampling_unsampled);
2442    stats.sampling_estimated = stats
2443        .sampling_estimated
2444        .saturating_add(current.sampling_estimated);
2445    if current.last_realtime_usec > stats.last_realtime_usec {
2446        stats.last_realtime_usec = current.last_realtime_usec;
2447    }
2448    if current.max_source_realtime_delta_usec > stats.max_source_realtime_delta_usec {
2449        stats.max_source_realtime_delta_usec = current.max_source_realtime_delta_usec;
2450    }
2451    stats.data_refs_seen = stats.data_refs_seen.saturating_add(current.data_refs_seen);
2452    stats.data_refs_skipped = stats
2453        .data_refs_skipped
2454        .saturating_add(current.data_refs_skipped);
2455    stats.data_payloads_loaded = stats
2456        .data_payloads_loaded
2457        .saturating_add(current.data_payloads_loaded);
2458    stats.data_objects_classified = stats
2459        .data_objects_classified
2460        .saturating_add(current.data_objects_classified);
2461    stats.data_cache_hits = stats
2462        .data_cache_hits
2463        .saturating_add(current.data_cache_hits);
2464    stats.data_cache_misses = stats
2465        .data_cache_misses
2466        .saturating_add(current.data_cache_misses);
2467    stats.payloads_decompressed = stats
2468        .payloads_decompressed
2469        .saturating_add(current.payloads_decompressed);
2470    stats.fts_scans = stats.fts_scans.saturating_add(current.fts_scans);
2471    stats.facet_updates = stats.facet_updates.saturating_add(current.facet_updates);
2472    stats.histogram_updates = stats
2473        .histogram_updates
2474        .saturating_add(current.histogram_updates);
2475    stats.returned_row_expansions = stats
2476        .returned_row_expansions
2477        .saturating_add(current.returned_row_expansions);
2478    stats.early_stop_opportunities = stats
2479        .early_stop_opportunities
2480        .saturating_add(current.early_stop_opportunities);
2481    stats.early_stops = stats.early_stops.saturating_add(current.early_stops);
2482    stats
2483}
2484
2485struct Columns {
2486    order: Vec<String>,
2487    map: Map<String, Value>,
2488}
2489
2490struct QueryResponseArtifacts {
2491    reportable_facet_field_names: Vec<String>,
2492    columns: Columns,
2493    data: Vec<Value>,
2494    facets: Value,
2495    histogram: Option<Value>,
2496    message: Value,
2497    items: Value,
2498}
2499
2500fn base_query_response(
2501    request: &NetdataRequest,
2502    combined: &CombinedResult,
2503    artifacts: &QueryResponseArtifacts,
2504) -> Value {
2505    json!({
2506        "_request": &request.echo,
2507        "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
2508        "_journal_files": {
2509            "matched": combined.matched_files,
2510            "skipped": combined.skipped_files,
2511            "errors": &combined.file_errors,
2512        },
2513        "status": 200,
2514        "partial": combined.partial,
2515        "type": "table",
2516        "show_ids": true,
2517        "has_history": true,
2518        "pagination": {
2519            "enabled": true,
2520            "key": "anchor",
2521            "column": "timestamp",
2522            "units": "timestamp_usec",
2523        },
2524        "columns": &artifacts.columns.map,
2525        "data": &artifacts.data,
2526        "_stats": {
2527            "sdk_explorer": &combined.stats,
2528        },
2529        "expires": if request.data_only {
2530            unix_now_seconds().saturating_add(3600)
2531        } else {
2532            0
2533        }
2534    })
2535}
2536
2537fn response_items(request: &NetdataRequest, combined: &CombinedResult, returned: u64) -> Value {
2538    let unsampled = combined.stats.rows_unsampled;
2539    let estimated = combined.stats.rows_estimated;
2540    let fallback_rows_after_returned =
2541        response_fallback_rows_after_returned(&combined.stats, returned);
2542    let page_counters = combined
2543        .page_counters
2544        .unwrap_or_else(|| NetdataPageCounters {
2545            matched: combined.stats.rows_matched,
2546            before: 0,
2547            after: fallback_rows_after_returned,
2548        });
2549    json!({
2550        "evaluated": combined.stats.rows_examined.saturating_add(unsampled).saturating_add(estimated),
2551        "matched": page_counters.matched.saturating_add(unsampled).saturating_add(estimated),
2552        "unsampled": unsampled,
2553        "estimated": estimated,
2554        "returned": returned,
2555        "max_to_return": request.limit as u64,
2556        "before": page_counters.before,
2557        "after": page_counters.after,
2558    })
2559}
2560
2561fn response_fallback_rows_after_returned(stats: &ExplorerStats, returned: u64) -> u64 {
2562    let source_rows = if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2563        stats.rows_examined
2564    } else {
2565        stats.rows_matched
2566    };
2567    source_rows.saturating_sub(returned)
2568}
2569
2570fn add_last_modified_if_needed(
2571    object: &mut Map<String, Value>,
2572    request: &NetdataRequest,
2573    combined: &CombinedResult,
2574) {
2575    if !request.data_only || request.tail {
2576        object.insert(
2577            "last_modified".to_string(),
2578            Value::from(combined.stats.last_realtime_usec),
2579        );
2580    }
2581}
2582
2583fn add_sampling_if_needed(object: &mut Map<String, Value>, combined: &CombinedResult) {
2584    if combined.sampling_enabled {
2585        object.insert(
2586            "_sampling".to_string(),
2587            json!({
2588                "enabled": true,
2589                "sampled": combined.stats.sampling_sampled,
2590                "unsampled": combined.stats.sampling_unsampled,
2591                "estimated": combined.stats.sampling_estimated,
2592            }),
2593        );
2594    }
2595}
2596
2597fn add_analysis_outputs_if_needed(
2598    object: &mut Map<String, Value>,
2599    request: &NetdataRequest,
2600    artifacts: QueryResponseArtifacts,
2601) {
2602    if !request.data_only || request.delta {
2603        let (facets_key, histogram_key, items_key) = response_analysis_keys(request.data_only);
2604        object.insert(facets_key.to_string(), artifacts.facets);
2605        object.insert(
2606            histogram_key.to_string(),
2607            artifacts.histogram.unwrap_or(Value::Null),
2608        );
2609        object.insert(items_key.to_string(), artifacts.items);
2610    }
2611}
2612
2613fn response_analysis_keys(data_only: bool) -> (&'static str, &'static str, &'static str) {
2614    if data_only {
2615        ("facets_delta", "histogram_delta", "items_delta")
2616    } else {
2617        ("facets", "histogram", "items")
2618    }
2619}
2620
2621fn merge_histogram(
2622    target: &mut Option<ExplorerHistogram>,
2623    source: ExplorerHistogram,
2624) -> Result<()> {
2625    let Some(target) = target else {
2626        *target = Some(source);
2627        return Ok(());
2628    };
2629    if target.field != source.field
2630        || target.buckets.len() != source.buckets.len()
2631        || target
2632            .buckets
2633            .iter()
2634            .zip(source.buckets.iter())
2635            .any(|(target_bucket, source_bucket)| {
2636                target_bucket.start_realtime_usec != source_bucket.start_realtime_usec
2637                    || target_bucket.end_realtime_usec != source_bucket.end_realtime_usec
2638            })
2639    {
2640        return Err(SdkError::Unsupported(
2641            "inconsistent Netdata histogram bucket shape",
2642        ));
2643    }
2644    for (index, source_bucket) in source.buckets.into_iter().enumerate() {
2645        let Some(target_bucket) = target.buckets.get_mut(index) else {
2646            return Err(SdkError::Unsupported(
2647                "inconsistent Netdata histogram bucket shape",
2648            ));
2649        };
2650        for (value, count) in source_bucket.values {
2651            *target_bucket.values.entry(value).or_default() += count;
2652        }
2653    }
2654    Ok(())
2655}
2656
2657fn facet_group_is_reportable(values: &BTreeMap<Vec<u8>, u64>) -> bool {
2658    values
2659        .iter()
2660        .any(|(value, _count)| !value.is_empty() && value.as_slice() != b"-")
2661}
2662
2663fn netdata_facet_value(value: &[u8]) -> &[u8] {
2664    if value.len() > NETDATA_FACET_MAX_VALUE_LENGTH {
2665        &value[..NETDATA_FACET_MAX_VALUE_LENGTH]
2666    } else {
2667        value
2668    }
2669}
2670
2671fn add_netdata_facet_count(target: &mut BTreeMap<Vec<u8>, u64>, value: &[u8], count: u64) {
2672    *target
2673        .entry(netdata_facet_value(value).to_vec())
2674        .or_default() += count;
2675}
2676
2677fn not_modified_before_scan_response(
2678    request: &NetdataRequest,
2679    selected: &SelectedJournalFiles,
2680) -> Option<Value> {
2681    if request.if_modified_since_usec != 0 && !selected.files_are_newer {
2682        Some(netdata_function_error(
2683            304,
2684            "No new data since the previous call.",
2685        ))
2686    } else {
2687        None
2688    }
2689}
2690
2691fn should_collect_unfiltered_facet_vocabulary(
2692    request: &NetdataRequest,
2693    combined: &CombinedResult,
2694) -> bool {
2695    !request.data_only && !combined.partial && !request.filters.is_empty()
2696}
2697
2698fn progress_context(file_index: usize, total_files: usize, started: Instant) -> ProgressContext {
2699    ProgressContext {
2700        current_file: file_index + 1,
2701        total_files,
2702        started,
2703    }
2704}
2705
2706fn emit_netdata_progress(
2707    options: &mut NetdataFunctionRunOptions<'_>,
2708    progress: NetdataFunctionProgress,
2709) {
2710    if let Some(callback) = options.progress_callback.as_deref_mut() {
2711        callback(progress);
2712    }
2713}
2714
2715fn emit_progress_for_combined(
2716    options: &mut NetdataFunctionRunOptions<'_>,
2717    combined: &CombinedResult,
2718    context: ProgressContext,
2719) {
2720    emit_netdata_progress(
2721        options,
2722        NetdataFunctionProgress {
2723            current_file: context.current_file,
2724            total_files: context.total_files,
2725            matched_files: combined.matched_files,
2726            skipped_files: combined.skipped_files,
2727            stats: combined.stats.clone(),
2728            elapsed: context.started.elapsed(),
2729        },
2730    );
2731}
2732
2733fn emit_explorer_progress(
2734    options: &mut NetdataFunctionRunOptions<'_>,
2735    combined: &CombinedResult,
2736    progress: ExplorerProgress,
2737    context: ProgressContext,
2738) {
2739    let stats = merged_progress_stats(&combined.stats, &progress.stats);
2740    if let Some(callback) = options.progress_callback.as_deref_mut() {
2741        callback(NetdataFunctionProgress {
2742            current_file: context.current_file,
2743            total_files: context.total_files,
2744            matched_files: combined.matched_files,
2745            skipped_files: combined.skipped_files,
2746            stats,
2747            elapsed: context.started.elapsed(),
2748        });
2749    }
2750}
2751
2752fn request_cancelled(options: &NetdataFunctionRunOptions<'_>) -> bool {
2753    options
2754        .cancellation_callback
2755        .is_some_and(|is_cancelled| is_cancelled())
2756}
2757
2758fn should_stop_before_file(
2759    combined: &mut CombinedResult,
2760    deadline: Option<Instant>,
2761    options: &NetdataFunctionRunOptions<'_>,
2762) -> bool {
2763    if request_cancelled(options) {
2764        combined.partial = true;
2765        combined.cancelled = true;
2766        return true;
2767    }
2768    if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
2769        combined.partial = true;
2770        combined.timed_out = true;
2771        return true;
2772    }
2773    false
2774}
2775
2776fn collect_column_fields_for_file(
2777    reader: &mut FileReader,
2778    request: &NetdataRequest,
2779    path: &Path,
2780    combined: &mut CombinedResult,
2781) {
2782    if request.data_only {
2783        return;
2784    }
2785    match reader.enumerate_fields_indexed() {
2786        Ok(fields) => combined.add_column_fields(fields),
2787        Err(err) => combined.file_errors.push(format!(
2788            "{}: FIELD index enumeration failed: {err}",
2789            path.display()
2790        )),
2791    }
2792}
2793
2794fn record_explore_result(
2795    result: Result<(ExplorerResult, Option<ExplorerStopReason>)>,
2796    path: &Path,
2797    combined: &mut CombinedResult,
2798) -> Option<(ExplorerResult, Option<ExplorerStopReason>)> {
2799    match result {
2800        Ok(result) => Some(result),
2801        Err(err) => {
2802            combined.skipped_files = combined.skipped_files.saturating_add(1);
2803            combined
2804                .file_errors
2805                .push(format!("{}: {err}", path.display()));
2806            None
2807        }
2808    }
2809}
2810
2811fn delta_scan_can_stop(
2812    request: &NetdataRequest,
2813    page_window: &RefCell<NetdataPageWindow>,
2814    realtime_usec: u64,
2815    rows_matched: u64,
2816    realtime_delta_usec: u64,
2817) -> bool {
2818    let mut page_window = page_window.borrow_mut();
2819    page_window.observe(realtime_usec);
2820    request.data_only
2821        && request.delta
2822        && rows_matched % DATA_ONLY_CHECK_EVERY_ROWS == 0
2823        && page_window.can_stop_delta_file(realtime_usec, realtime_delta_usec)
2824}
2825
2826#[allow(clippy::too_many_arguments)]
2827fn finish_explored_file(
2828    options: &mut NetdataFunctionRunOptions<'_>,
2829    request: &NetdataRequest,
2830    file: &SelectedJournalFile,
2831    query: &ExplorerQuery,
2832    result: ExplorerResult,
2833    stop_reason: Option<ExplorerStopReason>,
2834    combined: &mut CombinedResult,
2835    files: &[SelectedJournalFile],
2836    file_index: usize,
2837    progress: ProgressContext,
2838) -> Result<bool> {
2839    update_learned_realtime_delta(options, &file.path, &file.order, &result.stats);
2840    combined.merge(&file.path, result, query.direction, request.limit)?;
2841    emit_progress_for_combined(options, combined, progress);
2842    if request_cancelled(options) {
2843        combined.partial = true;
2844        combined.cancelled = true;
2845        return Ok(true);
2846    }
2847    if let Some(reason) = stop_reason {
2848        combined.partial = true;
2849        match reason {
2850            ExplorerStopReason::TimedOut => combined.timed_out = true,
2851            ExplorerStopReason::Cancelled => combined.cancelled = true,
2852        }
2853        return Ok(true);
2854    }
2855    Ok(request.data_only
2856        && !request.delta
2857        && !request.tail
2858        && combined.rows.len() >= request.limit
2859        && remaining_files_cannot_affect_data_page(combined, request, files, file_index + 1))
2860}
2861
2862fn file_metadata(
2863    options: &NetdataFunctionRunOptions<'_>,
2864    path: &Path,
2865) -> Option<NetdataJournalFileMetadata> {
2866    options
2867        .state
2868        .as_deref()
2869        .and_then(|state| state.file_metadata(path))
2870}
2871
2872fn update_learned_realtime_delta(
2873    options: &mut NetdataFunctionRunOptions<'_>,
2874    path: &Path,
2875    order: &JournalFileOrderInfo,
2876    stats: &ExplorerStats,
2877) {
2878    let learned_delta = stats.max_source_realtime_delta_usec;
2879    if learned_delta == 0 || learned_delta <= order.journal_vs_realtime_delta_usec {
2880        return;
2881    }
2882    let learned_delta = normalize_journal_vs_realtime_delta_usec(learned_delta);
2883    if learned_delta <= order.journal_vs_realtime_delta_usec {
2884        return;
2885    }
2886    if let Some(state) = options.state.as_deref_mut() {
2887        state.update_file_journal_vs_realtime_delta_usec(path, learned_delta);
2888    }
2889}
2890
2891fn normalize_journal_vs_realtime_delta_usec(delta_usec: u64) -> u64 {
2892    delta_usec
2893        .max(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC)
2894        .min(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC)
2895}
2896
2897#[cfg(test)]
2898fn file_may_overlap_request(header: crate::FileHeader, request: &NetdataRequest) -> bool {
2899    if header.tail_entry_realtime == 0 {
2900        return true;
2901    }
2902
2903    let first = header
2904        .head_entry_realtime
2905        .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2906    let last = header
2907        .tail_entry_realtime
2908        .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2909
2910    if request
2911        .after_realtime_usec
2912        .is_some_and(|after| last < after)
2913    {
2914        return false;
2915    }
2916    if request
2917        .before_realtime_usec
2918        .is_some_and(|before| first > before)
2919    {
2920        return false;
2921    }
2922
2923    true
2924}
2925
2926#[derive(Debug)]
2927struct SelectedJournalFile {
2928    path: PathBuf,
2929    order: JournalFileOrderInfo,
2930}
2931
2932#[derive(Debug, Default)]
2933struct SelectedJournalFiles {
2934    files: Vec<SelectedJournalFile>,
2935    files_are_newer: bool,
2936}
2937
2938fn select_journal_files_for_request(
2939    paths: Vec<PathBuf>,
2940    request: &NetdataRequest,
2941    reader_options: ReaderOptions,
2942    options: &NetdataFunctionRunOptions<'_>,
2943) -> SelectedJournalFiles {
2944    let mut selected = Vec::new();
2945    for path in paths {
2946        let metadata = file_metadata(options, &path);
2947        if !request.matches_source(&path, metadata.as_ref()) {
2948            continue;
2949        }
2950        let order = journal_file_order_info(&path, reader_options, metadata.as_ref());
2951        if !journal_file_order_may_overlap_request(&order, request) {
2952            continue;
2953        }
2954        selected.push(SelectedJournalFile { path, order });
2955    }
2956    selected.sort_by(|left, right| {
2957        compare_journal_file_order(&left.order, &right.order, request.direction)
2958            .then_with(|| left.path.cmp(&right.path))
2959    });
2960    let files_are_newer = selected
2961        .iter()
2962        .any(|file| file.order.msg_last_realtime_usec > request.if_modified_since_usec);
2963    SelectedJournalFiles {
2964        files: selected,
2965        files_are_newer,
2966    }
2967}
2968
2969fn remaining_files_cannot_affect_data_page(
2970    combined: &CombinedResult,
2971    request: &NetdataRequest,
2972    files: &[SelectedJournalFile],
2973    next_file_index: usize,
2974) -> bool {
2975    let Some(next) = files.get(next_file_index) else {
2976        return true;
2977    };
2978    let slack = next.order.journal_vs_realtime_delta_usec;
2979    match request.direction {
2980        Direction::Backward => {
2981            let Some(oldest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).min()
2982            else {
2983                return false;
2984            };
2985            next.order.msg_last_realtime_usec < oldest_retained.saturating_sub(slack)
2986        }
2987        Direction::Forward => {
2988            let Some(newest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).max()
2989            else {
2990                return false;
2991            };
2992            next.order.msg_first_realtime_usec > newest_retained.saturating_add(slack)
2993        }
2994    }
2995}
2996
2997fn journal_file_order_may_overlap_request(
2998    info: &JournalFileOrderInfo,
2999    request: &NetdataRequest,
3000) -> bool {
3001    if info.msg_last_realtime_usec == 0 {
3002        return true;
3003    }
3004
3005    let first = info
3006        .msg_first_realtime_usec
3007        .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
3008    let last = info
3009        .msg_last_realtime_usec
3010        .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
3011
3012    if request
3013        .after_realtime_usec
3014        .is_some_and(|after| last < after)
3015    {
3016        return false;
3017    }
3018    if request
3019        .before_realtime_usec
3020        .is_some_and(|before| first > before)
3021    {
3022        return false;
3023    }
3024
3025    true
3026}
3027
3028fn collect_boot_first_realtime(
3029    paths: &[PathBuf],
3030    reader_options: ReaderOptions,
3031    needed_boot_ids: &BTreeSet<Vec<u8>>,
3032) -> BTreeMap<Vec<u8>, u64> {
3033    let mut out = BTreeMap::new();
3034    if needed_boot_ids.is_empty() {
3035        return out;
3036    }
3037    for path in paths {
3038        let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
3039            continue;
3040        };
3041        let Ok(boot_ids) = reader.query_unique("_BOOT_ID") else {
3042            continue;
3043        };
3044        for boot_id in boot_ids {
3045            if !needed_boot_ids.contains(&boot_id) {
3046                continue;
3047            }
3048            let mut match_payload = b"_BOOT_ID=".to_vec();
3049            match_payload.extend_from_slice(&boot_id);
3050            reader.flush_matches();
3051            reader.add_match(&match_payload);
3052            reader.seek_head();
3053            if !reader.next().unwrap_or(false) {
3054                continue;
3055            }
3056            if let Ok(realtime) = reader.get_realtime_usec() {
3057                record_boot_first_realtime(&mut out, boot_id, realtime);
3058            }
3059        }
3060        reader.flush_matches();
3061    }
3062    out
3063}
3064
3065fn response_boot_ids(
3066    column_order: &[String],
3067    rows: &[LocatedRow],
3068    facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
3069    histogram: Option<&ExplorerHistogram>,
3070) -> BTreeSet<Vec<u8>> {
3071    let mut boot_ids = BTreeSet::new();
3072    let row_needs_boot_id = column_order.iter().any(|field| field == "_BOOT_ID");
3073    if row_needs_boot_id {
3074        for row in rows {
3075            if let Some(values) = row_fields(row).get("_BOOT_ID") {
3076                boot_ids.extend(values.iter().cloned());
3077            }
3078        }
3079    }
3080    if let Some(values) = facets.get(b"_BOOT_ID".as_slice()) {
3081        boot_ids.extend(
3082            values
3083                .keys()
3084                .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3085                .cloned(),
3086        );
3087    }
3088    if let Some(histogram) = histogram.filter(|histogram| histogram.field == b"_BOOT_ID") {
3089        for bucket in &histogram.buckets {
3090            boot_ids.extend(
3091                bucket
3092                    .values
3093                    .keys()
3094                    .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3095                    .cloned(),
3096            );
3097        }
3098    }
3099    boot_ids
3100}
3101
3102fn record_boot_first_realtime(
3103    target: &mut BTreeMap<Vec<u8>, u64>,
3104    boot_id: Vec<u8>,
3105    realtime_usec: u64,
3106) {
3107    let existing = target.entry(boot_id).or_insert(realtime_usec);
3108    if realtime_usec < *existing {
3109        *existing = realtime_usec;
3110    }
3111}
3112
3113fn row_fields(row: &LocatedRow) -> BTreeMap<String, Vec<Vec<u8>>> {
3114    let mut fields = BTreeMap::new();
3115    for payload in &row.row.payloads {
3116        let Some((field, value)) = split_payload(payload) else {
3117            continue;
3118        };
3119        fields
3120            .entry(String::from_utf8_lossy(field).into_owned())
3121            .or_insert_with(Vec::new)
3122            .push(value.to_vec());
3123    }
3124    fields.insert(
3125        "ND_JOURNAL_FILE".to_string(),
3126        vec![row.file_path.display().to_string().into_bytes()],
3127    );
3128    if !fields.contains_key("ND_JOURNAL_PROCESS") {
3129        let process = dynamic_process_name(&fields);
3130        if !process.is_empty() {
3131            fields.insert("ND_JOURNAL_PROCESS".to_string(), vec![process.into_bytes()]);
3132        }
3133    }
3134    fields
3135}
3136
3137fn dynamic_process_name(fields: &BTreeMap<String, Vec<Vec<u8>>>) -> String {
3138    let base = first_value(fields, "CONTAINER_NAME")
3139        .or_else(|| first_value(fields, "SYSLOG_IDENTIFIER"))
3140        .or_else(|| first_value(fields, "_COMM"))
3141        .map(|value| String::from_utf8_lossy(value).into_owned())
3142        .unwrap_or_default();
3143    if base.is_empty() {
3144        return "-".to_string();
3145    }
3146    match first_value(fields, "_PID") {
3147        Some(pid) if !pid.is_empty() => {
3148            let pid = String::from_utf8_lossy(pid);
3149            format!("{base}[{pid}]")
3150        }
3151        Some(_) => base,
3152        None => format!("{base}[-]"),
3153    }
3154}
3155
3156fn first_value<'a>(fields: &'a BTreeMap<String, Vec<Vec<u8>>>, field: &str) -> Option<&'a [u8]> {
3157    fields
3158        .get(field)
3159        .and_then(|values| values.first())
3160        .map(Vec::as_slice)
3161}
3162
3163fn split_payload(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3164    let split = payload.iter().position(|byte| *byte == b'=')?;
3165    Some((&payload[..split], &payload[split + 1..]))
3166}
3167
3168fn make_row_timestamps_unique(rows: &mut [LocatedRow], direction: Direction) {
3169    let mut last_from = 0u64;
3170    let mut last_to = 0u64;
3171    let mut initialized = false;
3172    for row in rows {
3173        let timestamp = row.row.realtime_usec;
3174        if initialized && timestamp >= last_from && timestamp <= last_to {
3175            match direction {
3176                Direction::Backward => {
3177                    last_from = last_from.saturating_sub(1);
3178                    row.row.realtime_usec = last_from;
3179                }
3180                Direction::Forward => {
3181                    last_to = last_to.saturating_add(1);
3182                    row.row.realtime_usec = last_to;
3183                }
3184            }
3185        } else {
3186            last_from = timestamp;
3187            last_to = timestamp;
3188            initialized = true;
3189        }
3190    }
3191}
3192
3193fn column_metadata(key: &str, index: usize) -> Value {
3194    let (visible, filter, full_width) = match key {
3195        "timestamp" => (true, "range", false),
3196        "rowOptions" => (false, "none", false),
3197        "_HOSTNAME" => (true, "facet", false),
3198        "ND_JOURNAL_PROCESS" | "MESSAGE" => (true, "none", key == "MESSAGE"),
3199        "ND_JOURNAL_FILE" | "_SOURCE_REALTIME_TIMESTAMP" => (false, "none", false),
3200        _ if systemd_column_is_facet(key) => (false, "facet", false),
3201        _ => (false, "none", false),
3202    };
3203    let column_type = if key == "timestamp" {
3204        "timestamp"
3205    } else if key == "rowOptions" {
3206        "none"
3207    } else {
3208        "string"
3209    };
3210    let visualization = if key == "rowOptions" {
3211        "rowOptions"
3212    } else {
3213        "value"
3214    };
3215    let mut metadata = json!({
3216        "index": index,
3217        "unique_key": key == "timestamp",
3218        "name": if key == "timestamp" { "Timestamp" } else { key },
3219        "visible": visible,
3220        "type": column_type,
3221        "visualization": visualization,
3222        "value_options": {
3223            "transform": if key == "timestamp" { "datetime_usec" } else { "none" },
3224            "decimal_points": 0,
3225            "default_value": if key == "timestamp" || key == "rowOptions" {
3226                Value::Null
3227            } else {
3228                Value::String("-".to_string())
3229            },
3230        },
3231        "sort": "ascending",
3232        "sortable": false,
3233        "sticky": false,
3234        "summary": "count",
3235        "filter": filter,
3236        "full_width": full_width,
3237        "wrap": key != "rowOptions",
3238        "default_expanded_filter": matches!(key, "PRIORITY" | "SYSLOG_FACILITY" | "MESSAGE_ID"),
3239    });
3240    if key == "rowOptions" {
3241        if let Some(object) = metadata.as_object_mut() {
3242            object.insert("dummy".to_string(), Value::Bool(true));
3243        }
3244    }
3245    metadata
3246}
3247
3248fn systemd_column_is_facet(key: &str) -> bool {
3249    if key == "MESSAGE_ID" {
3250        return true;
3251    }
3252    if key.contains("MESSAGE") || key.contains("TIMESTAMP") || key.starts_with("__") {
3253        return false;
3254    }
3255    true
3256}
3257
3258fn sort_facet_options(field: &str, options: &mut [Value]) {
3259    options.sort_by(|left, right| {
3260        let left_id = left.get("id").and_then(Value::as_str).unwrap_or_default();
3261        let right_id = right.get("id").and_then(Value::as_str).unwrap_or_default();
3262        if field == "PRIORITY" {
3263            return parse_priority(left_id).cmp(&parse_priority(right_id));
3264        }
3265        let left_count = left
3266            .get("count")
3267            .and_then(Value::as_u64)
3268            .unwrap_or_default();
3269        let right_count = right
3270            .get("count")
3271            .and_then(Value::as_u64)
3272            .unwrap_or_default();
3273        right_count
3274            .cmp(&left_count)
3275            .then_with(|| left_id.cmp(right_id))
3276    });
3277}
3278
3279fn parse_fts_query_patterns(query: &str) -> (Vec<ExplorerFtsPattern>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
3280    let bytes = query.as_bytes();
3281    let mut index = 0usize;
3282    let mut terms = Vec::new();
3283    let mut positives = Vec::new();
3284    let mut negatives = Vec::new();
3285
3286    while let Some((pattern, negative)) = next_fts_pattern(bytes, &mut index) {
3287        push_fts_pattern(
3288            pattern,
3289            negative,
3290            &mut terms,
3291            &mut positives,
3292            &mut negatives,
3293        );
3294    }
3295
3296    (terms, positives, negatives)
3297}
3298
3299fn next_fts_pattern(bytes: &[u8], index: &mut usize) -> Option<(Vec<u8>, bool)> {
3300    while *index < bytes.len() {
3301        skip_fts_separators(bytes, index);
3302        let negative = consume_fts_negative_marker(bytes, index);
3303        let pattern = read_fts_pattern(bytes, index);
3304        if !pattern.is_empty() {
3305            return Some((pattern, negative));
3306        }
3307    }
3308    None
3309}
3310
3311fn skip_fts_separators(bytes: &[u8], index: &mut usize) {
3312    while *index < bytes.len() && bytes[*index] == b'|' {
3313        *index += 1;
3314    }
3315}
3316
3317fn consume_fts_negative_marker(bytes: &[u8], index: &mut usize) -> bool {
3318    if bytes.get(*index) == Some(&b'!') {
3319        *index += 1;
3320        true
3321    } else {
3322        false
3323    }
3324}
3325
3326fn read_fts_pattern(bytes: &[u8], index: &mut usize) -> Vec<u8> {
3327    let mut pattern = Vec::new();
3328    let mut escaped = false;
3329    while *index < bytes.len() {
3330        let byte = bytes[*index];
3331        *index += 1;
3332        if byte == b'\\' && !escaped {
3333            escaped = true;
3334            continue;
3335        }
3336        if byte == b'|' && !escaped {
3337            break;
3338        }
3339        pattern.push(byte);
3340        escaped = false;
3341    }
3342    pattern
3343}
3344
3345fn push_fts_pattern(
3346    pattern: Vec<u8>,
3347    negative: bool,
3348    terms: &mut Vec<ExplorerFtsPattern>,
3349    positives: &mut Vec<Vec<u8>>,
3350    negatives: &mut Vec<Vec<u8>>,
3351) {
3352    terms.push(ExplorerFtsPattern::substring(pattern.clone(), negative));
3353    if negative {
3354        negatives.push(pattern);
3355    } else {
3356        positives.push(pattern);
3357    }
3358}
3359
3360fn parse_filters(value: Option<&Value>) -> Vec<ExplorerFilter> {
3361    let Some(Value::Object(selections)) = value else {
3362        return Vec::new();
3363    };
3364    let mut filters = Vec::new();
3365    for (field, values) in selections {
3366        if matches!(field.as_str(), "query" | "source" | "__logs_sources") {
3367            continue;
3368        }
3369        let Some(values) = parse_string_array(Some(values)) else {
3370            continue;
3371        };
3372        filters.push(ExplorerFilter::new(
3373            field.as_bytes().to_vec(),
3374            values
3375                .into_iter()
3376                .map(|value| normalize_filter_value(field, &value)),
3377        ));
3378    }
3379    filters
3380}
3381
3382#[derive(Debug, Clone)]
3383struct SourceSelection {
3384    source_type: u64,
3385    exact_sources: Vec<String>,
3386}
3387
3388fn parse_source_selection(value: Option<&Value>) -> SourceSelection {
3389    let mut selection = SourceSelection {
3390        source_type: SOURCE_TYPE_ALL,
3391        exact_sources: Vec::new(),
3392    };
3393    let Some(Value::Object(selections)) = value else {
3394        return selection;
3395    };
3396    let Some(values) = parse_string_array(selections.get("__logs_sources")) else {
3397        return selection;
3398    };
3399    selection.source_type = 0;
3400    for value in values {
3401        match source_type_for_name(&value) {
3402            Some(source_type) => selection.source_type |= source_type,
3403            None => selection.exact_sources.push(value),
3404        }
3405    }
3406    selection
3407}
3408
3409fn source_type_for_name(value: &str) -> Option<u64> {
3410    match value {
3411        "all" => Some(SOURCE_TYPE_ALL),
3412        "all-local-logs" => Some(SOURCE_TYPE_LOCAL_ALL),
3413        "all-remote-systems" => Some(SOURCE_TYPE_REMOTE_ALL),
3414        "all-local-system-logs" => Some(SOURCE_TYPE_LOCAL_SYSTEM),
3415        "all-local-user-logs" => Some(SOURCE_TYPE_LOCAL_USER),
3416        "all-local-namespaces" => Some(SOURCE_TYPE_LOCAL_NAMESPACE),
3417        "all-uncategorized" => Some(SOURCE_TYPE_LOCAL_OTHER),
3418        _ => None,
3419    }
3420}
3421
3422fn journal_file_source_type(path: &Path) -> u64 {
3423    let text = path.to_string_lossy();
3424    let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
3425        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER;
3426    };
3427    if text.contains("/remote/") {
3428        return SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL;
3429    }
3430    if local_namespace_source_name(path).is_some() {
3431        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE;
3432    }
3433    if name.starts_with("system") {
3434        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM;
3435    }
3436    if name.starts_with("user") {
3437        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER;
3438    }
3439    SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
3440}
3441
3442fn local_namespace_source_name(path: &Path) -> Option<String> {
3443    let parent = path.parent()?.file_name()?.to_str()?;
3444    let (_, namespace) = parent.rsplit_once('.')?;
3445    (!namespace.is_empty()).then(|| format!("namespace-{namespace}"))
3446}
3447
3448fn journal_file_exact_source_name(path: &Path) -> Option<String> {
3449    let text = path.to_string_lossy();
3450    if text.contains("/remote/") {
3451        let name = path.file_name()?.to_str()?;
3452        let source = name
3453            .split_once('@')
3454            .map(|(prefix, _)| prefix)
3455            .unwrap_or_else(|| {
3456                name.strip_suffix(".journal~.zst")
3457                    .or_else(|| name.strip_suffix(".journal.zst"))
3458                    .or_else(|| name.strip_suffix(".journal~"))
3459                    .or_else(|| name.strip_suffix(".journal"))
3460                    .unwrap_or(name)
3461            });
3462        return source.starts_with("remote-").then(|| source.to_string());
3463    }
3464    local_namespace_source_name(path)
3465}
3466
3467fn normalize_filter_value(field: &str, value: &str) -> Vec<u8> {
3468    if field == "PRIORITY" {
3469        if let Some(priority) = priority_name_to_number(value) {
3470            return priority.as_bytes().to_vec();
3471        }
3472    }
3473    value.as_bytes().to_vec()
3474}
3475
3476fn parse_string_array(value: Option<&Value>) -> Option<Vec<String>> {
3477    let Value::Array(items) = value? else {
3478        return None;
3479    };
3480    Some(
3481        items
3482            .iter()
3483            .filter_map(Value::as_str)
3484            .map(ToOwned::to_owned)
3485            .collect(),
3486    )
3487}
3488
3489fn request_direction(object: &Map<String, Value>) -> Direction {
3490    match get_str(object, "direction").unwrap_or("backward") {
3491        "forward" | "forwards" | "next" => Direction::Forward,
3492        _ => Direction::Backward,
3493    }
3494}
3495
3496fn request_delta(data_only: bool, object: &Map<String, Value>) -> bool {
3497    data_only && get_bool(object, "delta").unwrap_or(false)
3498}
3499
3500fn request_tail(data_only: bool, if_modified_since_usec: u64, object: &Map<String, Value>) -> bool {
3501    data_only && if_modified_since_usec != 0 && get_bool(object, "tail").unwrap_or(false)
3502}
3503
3504fn tail_after_realtime_bound(
3505    after_realtime_usec: Option<u64>,
3506    anchor: ExplorerAnchor,
3507) -> Option<u64> {
3508    let ExplorerAnchor::Realtime(anchor) = anchor else {
3509        return after_realtime_usec;
3510    };
3511    let tail_after = anchor.saturating_add(1);
3512    Some(
3513        after_realtime_usec
3514            .map(|after| after.max(tail_after))
3515            .unwrap_or(tail_after),
3516    )
3517}
3518
3519fn before_realtime_bound_excluding_anchor(
3520    before_realtime_usec: Option<u64>,
3521    anchor: ExplorerAnchor,
3522) -> Option<u64> {
3523    let ExplorerAnchor::Realtime(anchor) = anchor else {
3524        return before_realtime_usec;
3525    };
3526    let before_anchor = anchor.saturating_sub(1);
3527    Some(
3528        before_realtime_usec
3529            .map(|before| before.min(before_anchor))
3530            .unwrap_or(before_anchor),
3531    )
3532}
3533
3534fn request_anchor_and_direction(
3535    object: &Map<String, Value>,
3536    tail: bool,
3537    direction: Direction,
3538    after_realtime_usec: Option<u64>,
3539    before_realtime_usec: Option<u64>,
3540) -> (ExplorerAnchor, Direction) {
3541    let anchor = get_u64(object, "anchor")
3542        .filter(|value| *value != 0)
3543        .map(normalize_timestamp_to_usec)
3544        .map(ExplorerAnchor::Realtime)
3545        .unwrap_or(ExplorerAnchor::Auto);
3546    if tail && matches!(anchor, ExplorerAnchor::Realtime(_)) {
3547        return (anchor, Direction::Backward);
3548    }
3549    if anchor_outside_window(anchor, after_realtime_usec, before_realtime_usec) {
3550        (ExplorerAnchor::Auto, Direction::Backward)
3551    } else {
3552        (anchor, direction)
3553    }
3554}
3555
3556fn anchor_outside_window(
3557    anchor: ExplorerAnchor,
3558    after_realtime_usec: Option<u64>,
3559    before_realtime_usec: Option<u64>,
3560) -> bool {
3561    let ExplorerAnchor::Realtime(anchor_usec) = anchor else {
3562        return false;
3563    };
3564    after_realtime_usec.is_some_and(|after| anchor_usec < after)
3565        || before_realtime_usec.is_some_and(|before| anchor_usec > before)
3566}
3567
3568fn request_limit(object: &Map<String, Value>) -> usize {
3569    get_u64(object, "last")
3570        .filter(|value| *value != 0)
3571        .map(|value| value as usize)
3572        .unwrap_or(DEFAULT_ITEMS_TO_RETURN)
3573}
3574
3575fn request_facets(
3576    requested_facets: &Option<Vec<String>>,
3577    config: &NetdataFunctionConfig,
3578) -> Vec<Vec<u8>> {
3579    requested_facets
3580        .clone()
3581        .unwrap_or_else(|| config.default_facets.clone())
3582        .into_iter()
3583        .map(Vec::from)
3584        .collect()
3585}
3586
3587fn request_histogram(object: &Map<String, Value>) -> Option<String> {
3588    get_str(object, "histogram")
3589        .filter(|histogram| !histogram.is_empty())
3590        .map(ToOwned::to_owned)
3591}
3592
3593fn request_histogram_or_default(
3594    requested_histogram: &Option<String>,
3595    config: &NetdataFunctionConfig,
3596) -> Option<String> {
3597    requested_histogram
3598        .clone()
3599        .or_else(|| config.default_histogram.clone())
3600}
3601
3602fn request_query(object: &Map<String, Value>) -> Option<String> {
3603    get_str(object, "query")
3604        .filter(|query| !query.is_empty())
3605        .map(ToOwned::to_owned)
3606}
3607
3608fn get_bool(object: &Map<String, Value>, key: &str) -> Option<bool> {
3609    object.get(key).and_then(Value::as_bool)
3610}
3611
3612fn get_i64(object: &Map<String, Value>, key: &str) -> Option<i64> {
3613    object.get(key).and_then(Value::as_i64)
3614}
3615
3616fn get_u64(object: &Map<String, Value>, key: &str) -> Option<u64> {
3617    object.get(key).and_then(Value::as_u64)
3618}
3619
3620fn get_str<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
3621    object.get(key).and_then(Value::as_str)
3622}
3623
3624fn normalize_time_window(
3625    now_seconds: i64,
3626    after: Option<i64>,
3627    before: Option<i64>,
3628) -> (Option<u64>, Option<u64>) {
3629    let mut after = after.unwrap_or(0);
3630    let mut before = before.unwrap_or(0);
3631
3632    if after == 0 && before == 0 {
3633        before = now_seconds;
3634        after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3635    } else {
3636        (after, before) = relative_window_to_absolute(now_seconds, after, before);
3637    }
3638
3639    if after > before {
3640        std::mem::swap(&mut after, &mut before);
3641    }
3642    if after == before {
3643        after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3644    }
3645
3646    (
3647        Some(normalize_timestamp_to_usec_with_rounding(
3648            after.max(0) as u64,
3649            false,
3650        )),
3651        Some(normalize_timestamp_to_usec_with_rounding(
3652            before.max(0) as u64,
3653            true,
3654        )),
3655    )
3656}
3657
3658fn relative_window_to_absolute(now_seconds: i64, after: i64, before: i64) -> (i64, i64) {
3659    let mut after = after;
3660    let mut before = before;
3661
3662    if before.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3663        if before > 0 {
3664            before = -before;
3665        }
3666        before = now_seconds.saturating_add(before);
3667    }
3668
3669    if after.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3670        if after > 0 {
3671            after = -after;
3672        }
3673        if after == 0 {
3674            after = -NETDATA_MISSING_AFTER_RELATIVE_SECONDS;
3675        }
3676        after = before.saturating_add(after).saturating_add(1);
3677    }
3678
3679    if after > before {
3680        std::mem::swap(&mut after, &mut before);
3681    }
3682
3683    if before > now_seconds {
3684        let delta = before.saturating_sub(now_seconds);
3685        before = before.saturating_sub(delta);
3686        after = after.saturating_sub(delta);
3687    }
3688
3689    (after, before)
3690}
3691
3692struct RequestEchoInput<'a> {
3693    info: bool,
3694    after_realtime_usec: Option<u64>,
3695    before_realtime_usec: Option<u64>,
3696    if_modified_since_usec: u64,
3697    anchor: ExplorerAnchor,
3698    direction: Direction,
3699    limit: usize,
3700    data_only: bool,
3701    delta: bool,
3702    tail: bool,
3703    sampling: u64,
3704    source_type: u64,
3705    requested_facets: Option<&'a [String]>,
3706    selections: Option<&'a Value>,
3707    histogram: Option<&'a str>,
3708    query: Option<&'a str>,
3709}
3710
3711fn normalized_request_echo(input: &RequestEchoInput<'_>) -> Value {
3712    let anchor_usec = match input.anchor {
3713        ExplorerAnchor::Realtime(usec) => usec,
3714        ExplorerAnchor::Auto | ExplorerAnchor::Head | ExplorerAnchor::Tail => 0,
3715    };
3716    let mut out = json!({
3717        "info": input.info,
3718        // The SDK Netdata boundary always uses indexed slice semantics. The
3719        // field remains in the echo because it is part of the plugin request
3720        // shape and downstream fixtures compare normalized requests.
3721        "slice": true,
3722        "data_only": input.data_only,
3723        "delta": input.delta,
3724        "tail": input.tail,
3725        "sampling": input.sampling,
3726        "source_type": input.source_type,
3727        "after": input.after_realtime_usec.unwrap_or(0) / 1_000_000,
3728        "before": input.before_realtime_usec.unwrap_or(0) / 1_000_000,
3729        "if_modified_since": input.if_modified_since_usec,
3730        "anchor": anchor_usec,
3731        "direction": match input.direction {
3732            Direction::Forward => "forward",
3733            Direction::Backward => "backward",
3734        },
3735        "last": input.limit,
3736        "query": input.query,
3737        "histogram": input.histogram,
3738    });
3739    if let Some(facets) = input.requested_facets {
3740        if let Some(object) = out.as_object_mut() {
3741            object.insert(
3742                "facets".to_string(),
3743                facets
3744                    .iter()
3745                    .map(|field| Value::String(field.clone()))
3746                    .collect(),
3747            );
3748        }
3749    }
3750    if let Some(Value::Object(selections)) = input.selections {
3751        let mut selections = selections.clone();
3752        if let Some(Value::Array(sources)) = selections.get_mut("__logs_sources") {
3753            for source in sources {
3754                *source = Value::Null;
3755            }
3756        }
3757        if let Some(object) = out.as_object_mut() {
3758            object.insert("selections".to_string(), Value::Object(selections));
3759        }
3760    }
3761    out
3762}
3763
3764fn normalize_timestamp_to_usec(value: u64) -> u64 {
3765    normalize_timestamp_to_usec_with_rounding(value, false)
3766}
3767
3768fn normalize_timestamp_to_usec_with_rounding(value: u64, end_of_second: bool) -> u64 {
3769    if value >= 1_000_000_000_000 {
3770        value
3771    } else if end_of_second {
3772        value.saturating_mul(1_000_000).saturating_add(999_999)
3773    } else {
3774        value.saturating_mul(1_000_000)
3775    }
3776}
3777
3778fn unix_now_seconds() -> i64 {
3779    SystemTime::now()
3780        .duration_since(UNIX_EPOCH)
3781        .map(|duration| duration.as_secs() as i64)
3782        .unwrap_or_default()
3783}
3784
3785fn human_binary_size(bytes: u64) -> String {
3786    const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
3787    let mut value = bytes as f64;
3788    let mut unit = 0usize;
3789    while value >= 1024.0 && unit + 1 < UNITS.len() {
3790        value /= 1024.0;
3791        unit += 1;
3792    }
3793    if unit == 0 {
3794        format!("{}{}", bytes, UNITS[unit])
3795    } else if value.fract() == 0.0 {
3796        format!("{value:.0}{}", UNITS[unit])
3797    } else {
3798        let mut formatted = format!("{value:.2}");
3799        while formatted.contains('.') && formatted.ends_with('0') {
3800            formatted.pop();
3801        }
3802        if formatted.ends_with('.') {
3803            formatted.pop();
3804        }
3805        format!("{formatted}{}", UNITS[unit])
3806    }
3807}
3808
3809fn human_duration_seconds(seconds: u64) -> String {
3810    let years = seconds / (365 * 86_400);
3811    let seconds = seconds % (365 * 86_400);
3812    let months = seconds / (30 * 86_400);
3813    let seconds = seconds % (30 * 86_400);
3814    let days = seconds / 86_400;
3815    let seconds = seconds % 86_400;
3816    let hours = seconds / 3600;
3817    let minutes = (seconds % 3600) / 60;
3818    let seconds = seconds % 60;
3819    let mut parts = Vec::new();
3820    if years != 0 {
3821        parts.push(format!("{years}y"));
3822    }
3823    if months != 0 {
3824        parts.push(format!("{months}mo"));
3825    }
3826    if days != 0 {
3827        parts.push(format!("{days}d"));
3828    }
3829    if hours != 0 {
3830        parts.push(format!("{hours}h"));
3831    }
3832    if minutes != 0 {
3833        parts.push(format!("{minutes}m"));
3834    }
3835    if seconds != 0 || parts.is_empty() {
3836        parts.push(format!("{seconds}s"));
3837    }
3838    parts.join(" ")
3839}
3840
3841#[derive(Debug, Default)]
3842struct JournalFileCollection {
3843    files: Vec<PathBuf>,
3844    skipped: u64,
3845    errors: Vec<String>,
3846}
3847
3848fn collect_journal_files(path: &Path) -> Result<JournalFileCollection> {
3849    if !path.is_dir() {
3850        return Err(SdkError::InvalidPath(format!(
3851            "not a directory: {}",
3852            path.display()
3853        )));
3854    }
3855    let mut collection = JournalFileCollection::default();
3856    let mut pending = VecDeque::from([(path.to_path_buf(), 0usize)]);
3857    let mut visited = HashSet::new();
3858    while let Some((directory, depth)) = pending.pop_front() {
3859        let visited_key = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
3860        if visited.contains(&visited_key) {
3861            continue;
3862        }
3863        if visited.len() >= NETDATA_MAX_DIRECTORY_SCAN_COUNT {
3864            collection.skipped = collection.skipped.saturating_add(1);
3865            collection.errors.push(format!(
3866                "{}: directory scan limit reached",
3867                directory.display()
3868            ));
3869            continue;
3870        }
3871        visited.insert(visited_key);
3872        let entries = match std::fs::read_dir(&directory) {
3873            Ok(entries) => entries,
3874            Err(err) if directory == path => return Err(err.into()),
3875            Err(err) => {
3876                collection.skipped = collection.skipped.saturating_add(1);
3877                collection
3878                    .errors
3879                    .push(format!("{}: {err}", directory.display()));
3880                continue;
3881            }
3882        };
3883        for entry in entries.flatten() {
3884            let entry_path = entry.path();
3885            if entry_path.is_file() && is_journal_file_name(&entry_path) {
3886                collection.files.push(entry_path);
3887            } else if depth < NETDATA_MAX_DIRECTORY_SCAN_DEPTH && entry_path.is_dir() {
3888                pending.push_back((entry_path, depth + 1));
3889            }
3890        }
3891    }
3892    collection.files.sort();
3893    dedup_journal_files_by_canonical_path(&mut collection.files);
3894    Ok(collection)
3895}
3896
3897fn dedup_journal_files_by_canonical_path(files: &mut Vec<PathBuf>) {
3898    let mut seen = HashSet::new();
3899    files.retain(|path| {
3900        let key = std::fs::canonicalize(path).unwrap_or_else(|_| path.clone());
3901        seen.insert(key)
3902    });
3903}
3904
3905#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3906struct JournalFileOrderInfo {
3907    msg_first_realtime_usec: u64,
3908    msg_last_realtime_usec: u64,
3909    file_last_modified_usec: u64,
3910    journal_vs_realtime_delta_usec: u64,
3911}
3912
3913fn journal_file_order_info(
3914    path: &Path,
3915    reader_options: ReaderOptions,
3916    metadata: Option<&NetdataJournalFileMetadata>,
3917) -> JournalFileOrderInfo {
3918    let file_last_modified_usec = std::fs::metadata(path)
3919        .ok()
3920        .and_then(|metadata| metadata.modified().ok())
3921        .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
3922        .map(|duration| duration.as_micros().min(u128::from(u64::MAX)) as u64)
3923        .unwrap_or_default();
3924    let file_last_modified_usec = metadata
3925        .and_then(|metadata| metadata.file_last_modified_usec)
3926        .unwrap_or(file_last_modified_usec);
3927    let journal_vs_realtime_delta_usec = metadata
3928        .and_then(|metadata| metadata.journal_vs_realtime_delta_usec)
3929        .map(normalize_journal_vs_realtime_delta_usec)
3930        .unwrap_or(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
3931
3932    let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
3933        return JournalFileOrderInfo {
3934            msg_first_realtime_usec: 0,
3935            msg_last_realtime_usec: file_last_modified_usec,
3936            file_last_modified_usec,
3937            journal_vs_realtime_delta_usec,
3938        };
3939    };
3940    let header = reader.header();
3941    let msg_first_realtime_usec = metadata
3942        .and_then(|metadata| metadata.msg_first_realtime_usec)
3943        .unwrap_or(header.head_entry_realtime);
3944    let msg_last_realtime_usec = metadata
3945        .and_then(|metadata| metadata.msg_last_realtime_usec)
3946        .unwrap_or_else(|| {
3947            if header.tail_entry_realtime == 0 {
3948                file_last_modified_usec
3949            } else {
3950                header.tail_entry_realtime
3951            }
3952        });
3953    JournalFileOrderInfo {
3954        msg_first_realtime_usec,
3955        msg_last_realtime_usec,
3956        file_last_modified_usec,
3957        journal_vs_realtime_delta_usec,
3958    }
3959}
3960
3961fn compare_journal_file_order(
3962    left: &JournalFileOrderInfo,
3963    right: &JournalFileOrderInfo,
3964    direction: Direction,
3965) -> Ordering {
3966    let backward = right
3967        .msg_last_realtime_usec
3968        .cmp(&left.msg_last_realtime_usec)
3969        .then_with(|| {
3970            right
3971                .file_last_modified_usec
3972                .cmp(&left.file_last_modified_usec)
3973        })
3974        .then_with(|| {
3975            right
3976                .msg_first_realtime_usec
3977                .cmp(&left.msg_first_realtime_usec)
3978        });
3979    match direction {
3980        Direction::Backward => backward,
3981        Direction::Forward => backward.reverse(),
3982    }
3983}
3984
3985fn is_journal_file_name(path: &Path) -> bool {
3986    path.file_name()
3987        .and_then(|name| name.to_str())
3988        .is_some_and(|name| {
3989            name.ends_with(".journal")
3990                || name.ends_with(".journal~")
3991                || name.ends_with(".journal.zst")
3992                || name.ends_with(".journal~.zst")
3993        })
3994}
3995
3996fn push_unique_many(target: &mut Vec<String>, values: &[String]) {
3997    for value in values {
3998        push_unique(target, value);
3999    }
4000}
4001
4002fn string_fields(fields: &[Vec<u8>]) -> Vec<String> {
4003    fields
4004        .iter()
4005        .filter_map(|field| String::from_utf8(field.clone()).ok())
4006        .collect()
4007}
4008
4009fn push_unique(target: &mut Vec<String>, value: impl AsRef<str>) {
4010    let value = value.as_ref();
4011    if !target.iter().any(|existing| existing == value) {
4012        target.push(value.to_string());
4013    }
4014}
4015
4016fn netdata_reorder_key(value: &str) -> String {
4017    value
4018        .trim_start_matches(|character: char| character.is_ascii_punctuation())
4019        .to_ascii_lowercase()
4020}
4021
4022fn histogram_update_every_seconds(histogram: &ExplorerHistogram) -> u64 {
4023    histogram
4024        .buckets
4025        .first()
4026        .map(|bucket| {
4027            bucket
4028                .end_realtime_usec
4029                .saturating_sub(bucket.start_realtime_usec)
4030                .checked_div(1_000_000)
4031                .unwrap_or(1)
4032                .max(1)
4033        })
4034        .unwrap_or(1)
4035}
4036
4037enum TimestampPrecision {
4038    Seconds,
4039    Micros,
4040}
4041
4042fn format_realtime_usec(timestamp: u64, precision: TimestampPrecision) -> String {
4043    let seconds = (timestamp / 1_000_000) as i64;
4044    let micros = (timestamp % 1_000_000) as u32;
4045    DateTime::<Utc>::from_timestamp(seconds, micros.saturating_mul(1000))
4046        .map(|datetime| match precision {
4047            TimestampPrecision::Seconds => datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
4048            TimestampPrecision::Micros => datetime.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string(),
4049        })
4050        .unwrap_or_else(|| timestamp.to_string())
4051}
4052
4053fn priority_name(raw: &str) -> Option<&'static str> {
4054    match parse_priority(raw)? {
4055        0 => Some("panic"),
4056        1 => Some("alert"),
4057        2 => Some("critical"),
4058        3 => Some("error"),
4059        4 => Some("warning"),
4060        5 => Some("notice"),
4061        6 => Some("info"),
4062        7 => Some("debug"),
4063        _ => None,
4064    }
4065}
4066
4067fn priority_name_to_number(value: &str) -> Option<&'static str> {
4068    match value {
4069        "panic" | "emergency" | "emerg" => Some("0"),
4070        "alert" => Some("1"),
4071        "critical" | "crit" => Some("2"),
4072        "error" | "err" => Some("3"),
4073        "warning" | "warn" => Some("4"),
4074        "notice" => Some("5"),
4075        "info" => Some("6"),
4076        "debug" => Some("7"),
4077        _ => None,
4078    }
4079}
4080
4081fn parse_priority(raw: &str) -> Option<u8> {
4082    raw.parse::<u8>().ok()
4083}
4084
4085fn priority_to_row_severity(raw: &[u8]) -> &'static str {
4086    let raw = String::from_utf8_lossy(raw);
4087    match parse_priority(&raw) {
4088        Some(priority) if priority <= 3 => "critical",
4089        Some(4) => "warning",
4090        Some(5) => "notice",
4091        Some(priority) if priority >= 7 => "debug",
4092        _ => "normal",
4093    }
4094}
4095
4096fn syslog_facility_name(raw: &str) -> Option<&'static str> {
4097    match raw.parse::<u8>().ok()? {
4098        0 => Some("kern"),
4099        1 => Some("user"),
4100        2 => Some("mail"),
4101        3 => Some("daemon"),
4102        4 => Some("auth"),
4103        5 => Some("syslog"),
4104        6 => Some("lpr"),
4105        7 => Some("news"),
4106        8 => Some("uucp"),
4107        9 => Some("cron"),
4108        10 => Some("authpriv"),
4109        11 => Some("ftp"),
4110        16 => Some("local0"),
4111        17 => Some("local1"),
4112        18 => Some("local2"),
4113        19 => Some("local3"),
4114        20 => Some("local4"),
4115        21 => Some("local5"),
4116        22 => Some("local6"),
4117        23 => Some("local7"),
4118        _ => None,
4119    }
4120}
4121
4122const ERRNO_NAMES: &[(u32, &str)] = &[
4123    (1, "EPERM"),
4124    (2, "ENOENT"),
4125    (3, "ESRCH"),
4126    (4, "EINTR"),
4127    (5, "EIO"),
4128    (6, "ENXIO"),
4129    (7, "E2BIG"),
4130    (8, "ENOEXEC"),
4131    (9, "EBADF"),
4132    (10, "ECHILD"),
4133    (11, "EAGAIN"),
4134    (12, "ENOMEM"),
4135    (13, "EACCES"),
4136    (14, "EFAULT"),
4137    (15, "ENOTBLK"),
4138    (16, "EBUSY"),
4139    (17, "EEXIST"),
4140    (18, "EXDEV"),
4141    (19, "ENODEV"),
4142    (20, "ENOTDIR"),
4143    (21, "EISDIR"),
4144    (22, "EINVAL"),
4145    (23, "ENFILE"),
4146    (24, "EMFILE"),
4147    (25, "ENOTTY"),
4148    (26, "ETXTBSY"),
4149    (27, "EFBIG"),
4150    (28, "ENOSPC"),
4151    (29, "ESPIPE"),
4152    (30, "EROFS"),
4153    (31, "EMLINK"),
4154    (32, "EPIPE"),
4155    (33, "EDOM"),
4156    (34, "ERANGE"),
4157    (35, "EDEADLK"),
4158    (36, "ENAMETOOLONG"),
4159    (37, "ENOLCK"),
4160    (38, "ENOSYS"),
4161    (39, "ENOTEMPTY"),
4162    (40, "ELOOP"),
4163    (42, "ENOMSG"),
4164    (43, "EIDRM"),
4165    (44, "ECHRNG"),
4166    (45, "EL2NSYNC"),
4167    (46, "EL3HLT"),
4168    (47, "EL3RST"),
4169    (48, "ELNRNG"),
4170    (49, "EUNATCH"),
4171    (50, "ENOCSI"),
4172    (51, "EL2HLT"),
4173    (52, "EBADE"),
4174    (53, "EBADR"),
4175    (54, "EXFULL"),
4176    (55, "ENOANO"),
4177    (56, "EBADRQC"),
4178    (57, "EBADSLT"),
4179    (59, "EBFONT"),
4180    (60, "ENOSTR"),
4181    (61, "ENODATA"),
4182    (62, "ETIME"),
4183    (63, "ENOSR"),
4184    (64, "ENONET"),
4185    (65, "ENOPKG"),
4186    (66, "EREMOTE"),
4187    (67, "ENOLINK"),
4188    (68, "EADV"),
4189    (69, "ESRMNT"),
4190    (70, "ECOMM"),
4191    (71, "EPROTO"),
4192    (72, "EMULTIHOP"),
4193    (73, "EDOTDOT"),
4194    (74, "EBADMSG"),
4195    (75, "EOVERFLOW"),
4196    (76, "ENOTUNIQ"),
4197    (77, "EBADFD"),
4198    (78, "EREMCHG"),
4199    (79, "ELIBACC"),
4200    (80, "ELIBBAD"),
4201    (81, "ELIBSCN"),
4202    (82, "ELIBMAX"),
4203    (83, "ELIBEXEC"),
4204    (84, "EILSEQ"),
4205    (85, "ERESTART"),
4206    (86, "ESTRPIPE"),
4207    (87, "EUSERS"),
4208    (88, "ENOTSOCK"),
4209    (89, "EDESTADDRREQ"),
4210    (90, "EMSGSIZE"),
4211    (91, "EPROTOTYPE"),
4212    (92, "ENOPROTOOPT"),
4213    (93, "EPROTONOSUPPORT"),
4214    (94, "ESOCKTNOSUPPORT"),
4215    (95, "ENOTSUP"),
4216    (96, "EPFNOSUPPORT"),
4217    (97, "EAFNOSUPPORT"),
4218    (98, "EADDRINUSE"),
4219    (99, "EADDRNOTAVAIL"),
4220    (100, "ENETDOWN"),
4221    (101, "ENETUNREACH"),
4222    (102, "ENETRESET"),
4223    (103, "ECONNABORTED"),
4224    (104, "ECONNRESET"),
4225    (105, "ENOBUFS"),
4226    (106, "EISCONN"),
4227    (107, "ENOTCONN"),
4228    (108, "ESHUTDOWN"),
4229    (109, "ETOOMANYREFS"),
4230    (110, "ETIMEDOUT"),
4231    (111, "ECONNREFUSED"),
4232    (112, "EHOSTDOWN"),
4233    (113, "EHOSTUNREACH"),
4234    (114, "EALREADY"),
4235    (115, "EINPROGRESS"),
4236    (116, "ESTALE"),
4237    (117, "EUCLEAN"),
4238    (118, "ENOTNAM"),
4239    (119, "ENAVAIL"),
4240    (120, "EISNAM"),
4241    (121, "EREMOTEIO"),
4242    (122, "EDQUOT"),
4243    (123, "ENOMEDIUM"),
4244    (124, "EMEDIUMTYPE"),
4245    (125, "ECANCELED"),
4246    (126, "ENOKEY"),
4247    (127, "EKEYEXPIRED"),
4248    (128, "EKEYREVOKED"),
4249    (129, "EKEYREJECTED"),
4250    (130, "EOWNERDEAD"),
4251    (131, "ENOTRECOVERABLE"),
4252    (132, "ERFKILL"),
4253    (133, "EHWPOISON"),
4254];
4255
4256fn errno_name(raw: &str) -> Option<String> {
4257    let errno = raw.parse::<u32>().ok()?;
4258    let name = ERRNO_NAMES
4259        .iter()
4260        .find_map(|(candidate, name)| (*candidate == errno).then_some(*name))?;
4261    Some(format!("{errno} ({name})"))
4262}
4263
4264fn cap_effective_display(raw: &str) -> String {
4265    if !raw.bytes().next().is_some_and(|byte| byte.is_ascii_digit()) {
4266        return raw.to_string();
4267    }
4268    let Ok(value) = u64::from_str_radix(raw, 16) else {
4269        return raw.to_string();
4270    };
4271    if value == 0 {
4272        return raw.to_string();
4273    }
4274    const CAPABILITIES: &[&str] = &[
4275        "CHOWN",
4276        "DAC_OVERRIDE",
4277        "DAC_READ_SEARCH",
4278        "FOWNER",
4279        "FSETID",
4280        "KILL",
4281        "SETGID",
4282        "SETUID",
4283        "SETPCAP",
4284        "LINUX_IMMUTABLE",
4285        "NET_BIND_SERVICE",
4286        "NET_BROADCAST",
4287        "NET_ADMIN",
4288        "NET_RAW",
4289        "IPC_LOCK",
4290        "IPC_OWNER",
4291        "SYS_MODULE",
4292        "SYS_RAWIO",
4293        "SYS_CHROOT",
4294        "SYS_PTRACE",
4295        "SYS_PACCT",
4296        "SYS_ADMIN",
4297        "SYS_BOOT",
4298        "SYS_NICE",
4299        "SYS_RESOURCE",
4300        "SYS_TIME",
4301        "SYS_TTY_CONFIG",
4302        "MKNOD",
4303        "LEASE",
4304        "AUDIT_WRITE",
4305        "AUDIT_CONTROL",
4306        "SETFCAP",
4307        "MAC_OVERRIDE",
4308        "MAC_ADMIN",
4309        "SYSLOG",
4310        "WAKE_ALARM",
4311        "BLOCK_SUSPEND",
4312        "AUDIT_READ",
4313        "PERFMON",
4314        "BPF",
4315        "CHECKPOINT_RESTORE",
4316    ];
4317    let names: Vec<&str> = CAPABILITIES
4318        .iter()
4319        .enumerate()
4320        .filter_map(|(index, name)| ((value & (1u64 << index)) != 0).then_some(*name))
4321        .collect();
4322    if names.is_empty() {
4323        raw.to_string()
4324    } else {
4325        format!("{raw} ({})", names.join(" | "))
4326    }
4327}
4328
4329fn systemd_field_display_value(
4330    context: &DisplayContext,
4331    scope: DisplayScope,
4332    field: &str,
4333    value: &[u8],
4334    resolve_user_group_names: bool,
4335) -> Value {
4336    let raw = String::from_utf8_lossy(value);
4337    match field {
4338        "PRIORITY" => Value::String(priority_name(&raw).unwrap_or(&raw).to_string()),
4339        "SYSLOG_FACILITY" => Value::String(syslog_facility_name(&raw).unwrap_or(&raw).to_string()),
4340        "ERRNO" => Value::String(errno_name(&raw).unwrap_or_else(|| raw.to_string())),
4341        "MESSAGE_ID" => Value::String(match (message_id_name(&raw), scope) {
4342            (Some(name), DisplayScope::Data) => format!("{raw} ({name})"),
4343            (Some(name), DisplayScope::Facet | DisplayScope::Histogram) => name.to_string(),
4344            (None, _) => raw.into_owned(),
4345        }),
4346        "_BOOT_ID" => Value::String(match (context.boot_first_realtime.get(value), scope) {
4347            (Some(timestamp), DisplayScope::Data) => format!(
4348                "{} ({})  ",
4349                raw,
4350                format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4351            ),
4352            (Some(timestamp), DisplayScope::Facet | DisplayScope::Histogram) => {
4353                format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4354            }
4355            (None, _) => raw.into_owned(),
4356        }),
4357        "_UID"
4358        | "_SYSTEMD_OWNER_UID"
4359        | "OBJECT_SYSTEMD_OWNER_UID"
4360        | "OBJECT_UID"
4361        | "_AUDIT_LOGINUID"
4362        | "OBJECT_AUDIT_LOGINUID" => {
4363            if resolve_user_group_names {
4364                Value::String(cached_uid_display(context, raw.as_ref()))
4365            } else {
4366                Value::String(raw.into_owned())
4367            }
4368        }
4369        "_GID" | "OBJECT_GID" => {
4370            if resolve_user_group_names {
4371                Value::String(cached_gid_display(context, raw.as_ref()))
4372            } else {
4373                Value::String(raw.into_owned())
4374            }
4375        }
4376        "_CAP_EFFECTIVE" => Value::String(cap_effective_display(&raw)),
4377        "_SOURCE_REALTIME_TIMESTAMP" => Value::String(match raw.parse::<u64>() {
4378            Ok(timestamp) if timestamp != 0 => {
4379                format!(
4380                    "{} ({})",
4381                    raw,
4382                    format_realtime_usec(timestamp, TimestampPrecision::Micros)
4383                )
4384            }
4385            _ => raw.into_owned(),
4386        }),
4387        _ => Value::String(raw.into_owned()),
4388    }
4389}
4390
4391fn cached_uid_display(context: &DisplayContext, raw: &str) -> String {
4392    if let Some(value) = context.uid_display_cache.borrow().get(raw) {
4393        return value.clone();
4394    }
4395    let value = resolve_uid_name(raw).unwrap_or_else(|| raw.to_string());
4396    context
4397        .uid_display_cache
4398        .borrow_mut()
4399        .insert(raw.to_string(), value.clone());
4400    value
4401}
4402
4403fn cached_gid_display(context: &DisplayContext, raw: &str) -> String {
4404    if let Some(value) = context.gid_display_cache.borrow().get(raw) {
4405        return value.clone();
4406    }
4407    let value = resolve_gid_name(raw).unwrap_or_else(|| raw.to_string());
4408    context
4409        .gid_display_cache
4410        .borrow_mut()
4411        .insert(raw.to_string(), value.clone());
4412    value
4413}
4414
4415#[cfg(unix)]
4416fn resolve_uid_name(raw: &str) -> Option<String> {
4417    let uid = raw.parse::<libc::uid_t>().ok()?;
4418    let mut pwd = std::mem::MaybeUninit::<libc::passwd>::uninit();
4419    let mut result = std::ptr::null_mut();
4420    let mut buffer = vec![0i8; 16_384];
4421    let rc = unsafe {
4422        libc::getpwuid_r(
4423            uid,
4424            pwd.as_mut_ptr(),
4425            buffer.as_mut_ptr(),
4426            buffer.len(),
4427            &mut result,
4428        )
4429    };
4430    if rc != 0 || result.is_null() {
4431        return None;
4432    }
4433    let pwd = unsafe { pwd.assume_init() };
4434    Some(
4435        unsafe { CStr::from_ptr(pwd.pw_name) }
4436            .to_string_lossy()
4437            .into_owned(),
4438    )
4439}
4440
4441#[cfg(not(unix))]
4442fn resolve_uid_name(_raw: &str) -> Option<String> {
4443    None
4444}
4445
4446#[cfg(unix)]
4447fn resolve_gid_name(raw: &str) -> Option<String> {
4448    let gid = raw.parse::<libc::gid_t>().ok()?;
4449    let mut grp = std::mem::MaybeUninit::<libc::group>::uninit();
4450    let mut result = std::ptr::null_mut();
4451    let mut buffer = vec![0i8; 16_384];
4452    let rc = unsafe {
4453        libc::getgrgid_r(
4454            gid,
4455            grp.as_mut_ptr(),
4456            buffer.as_mut_ptr(),
4457            buffer.len(),
4458            &mut result,
4459        )
4460    };
4461    if rc != 0 || result.is_null() {
4462        return None;
4463    }
4464    let grp = unsafe { grp.assume_init() };
4465    Some(
4466        unsafe { CStr::from_ptr(grp.gr_name) }
4467            .to_string_lossy()
4468            .into_owned(),
4469    )
4470}
4471
4472#[cfg(not(unix))]
4473fn resolve_gid_name(_raw: &str) -> Option<String> {
4474    None
4475}
4476
4477const MESSAGE_ID_NAMES: &[(&str, &str)] = &[
4478    ("f77379a8490b408bbe5f6940505a777b", "Journal started"),
4479    ("d93fb3c9c24d451a97cea615ce59c00b", "Journal stopped"),
4480    (
4481        "a596d6fe7bfa4994828e72309e95d61e",
4482        "Journal messages suppressed",
4483    ),
4484    (
4485        "e9bf28e6e834481bb6f48f548ad13606",
4486        "Journal messages missed",
4487    ),
4488    (
4489        "ec387f577b844b8fa948f33cad9a75e6",
4490        "Journal disk space usage",
4491    ),
4492    ("fc2e22bc6ee647b6b90729ab34a250b1", "Coredump"),
4493    ("5aadd8e954dc4b1a8c954d63fd9e1137", "Coredump truncated"),
4494    ("1f4e0a44a88649939aaea34fc6da8c95", "Backtrace"),
4495    ("8d45620c1a4348dbb17410da57c60c66", "User Session created"),
4496    (
4497        "3354939424b4456d9802ca8333ed424a",
4498        "User Session terminated",
4499    ),
4500    ("fcbefc5da23d428093f97c82a9290f7b", "Seat started"),
4501    ("e7852bfe46784ed0accde04bc864c2d5", "Seat removed"),
4502    (
4503        "24d8d4452573402496068381a6312df2",
4504        "VM or container started",
4505    ),
4506    (
4507        "58432bd3bace477cb514b56381b8a758",
4508        "VM or container stopped",
4509    ),
4510    ("c7a787079b354eaaa9e77b371893cd27", "Time change"),
4511    ("45f82f4aef7a4bbf942ce861d1f20990", "Timezone change"),
4512    (
4513        "50876a9db00f4c40bde1a2ad381c3a1b",
4514        "System configuration issues",
4515    ),
4516    (
4517        "b07a249cd024414a82dd00cd181378ff",
4518        "System start-up completed",
4519    ),
4520    (
4521        "eed00a68ffd84e31882105fd973abdd1",
4522        "User start-up completed",
4523    ),
4524    ("6bbd95ee977941e497c48be27c254128", "Sleep start"),
4525    ("8811e6df2a8e40f58a94cea26f8ebf14", "Sleep stop"),
4526    (
4527        "98268866d1d54a499c4e98921d93bc40",
4528        "System shutdown initiated",
4529    ),
4530    (
4531        "c14aaf76ec284a5fa1f105f88dfb061c",
4532        "System factory reset initiated",
4533    ),
4534    ("d9ec5e95e4b646aaaea2fd05214edbda", "Container init crashed"),
4535    (
4536        "3ed0163e868a4417ab8b9e210407a96c",
4537        "System reboot failed after crash",
4538    ),
4539    ("645c735537634ae0a32b15a7c6cba7d4", "Init execution froze"),
4540    (
4541        "5addb3a06a734d3396b794bf98fb2d01",
4542        "Init crashed no coredump",
4543    ),
4544    ("5c9e98de4ab94c6a9d04d0ad793bd903", "Init crashed no fork"),
4545    (
4546        "5e6f1f5e4db64a0eaee3368249d20b94",
4547        "Init crashed unknown signal",
4548    ),
4549    (
4550        "83f84b35ee264f74a3896a9717af34cb",
4551        "Init crashed systemd signal",
4552    ),
4553    (
4554        "3a73a98baf5b4b199929e3226c0be783",
4555        "Init crashed process signal",
4556    ),
4557    (
4558        "2ed18d4f78ca47f0a9bc25271c26adb4",
4559        "Init crashed waitpid failed",
4560    ),
4561    (
4562        "56b1cd96f24246c5b607666fda952356",
4563        "Init crashed coredump failed",
4564    ),
4565    ("4ac7566d4d7548f4981f629a28f0f829", "Init crashed coredump"),
4566    (
4567        "38e8b1e039ad469291b18b44c553a5b7",
4568        "Crash shell failed to fork",
4569    ),
4570    (
4571        "872729b47dbe473eb768ccecd477beda",
4572        "Crash shell failed to execute",
4573    ),
4574    ("658a67adc1c940b3b3316e7e8628834a", "Selinux failed"),
4575    ("e6f456bd92004d9580160b2207555186", "Battery low warning"),
4576    (
4577        "267437d33fdd41099ad76221cc24a335",
4578        "Battery low powering off",
4579    ),
4580    (
4581        "79e05b67bc4545d1922fe47107ee60c5",
4582        "Manager mainloop failed",
4583    ),
4584    ("dbb136b10ef4457ba47a795d62f108c9", "Manager no xdgdir path"),
4585    (
4586        "ed158c2df8884fa584eead2d902c1032",
4587        "Init failed to drop capability bounding set of usermode",
4588    ),
4589    (
4590        "42695b500df048298bee37159caa9f2e",
4591        "Init failed to drop capability bounding set",
4592    ),
4593    (
4594        "bfc2430724ab44499735b4f94cca9295",
4595        "User manager can't disable new privileges",
4596    ),
4597    (
4598        "59288af523be43a28d494e41e26e4510",
4599        "Manager failed to start default target",
4600    ),
4601    (
4602        "689b4fcc97b4486ea5da92db69c9e314",
4603        "Manager failed to isolate default target",
4604    ),
4605    (
4606        "5ed836f1766f4a8a9fc5da45aae23b29",
4607        "Manager failed to collect passed file descriptors",
4608    ),
4609    (
4610        "6a40fbfbd2ba4b8db02fb40c9cd090d7",
4611        "Init failed to fix up environment variables",
4612    ),
4613    (
4614        "0e54470984ac419689743d957a119e2e",
4615        "Manager failed to allocate",
4616    ),
4617    (
4618        "d67fa9f847aa4b048a2ae33535331adb",
4619        "Manager failed to write Smack",
4620    ),
4621    (
4622        "af55a6f75b544431b72649f36ff6d62c",
4623        "System shutdown critical error",
4624    ),
4625    (
4626        "d18e0339efb24a068d9c1060221048c2",
4627        "Init failed to fork off valgrind",
4628    ),
4629    ("7d4958e842da4a758f6c1cdc7b36dcc5", "Unit starting"),
4630    ("39f53479d3a045ac8e11786248231fbf", "Unit started"),
4631    ("be02cf6855d2428ba40df7e9d022f03d", "Unit failed"),
4632    ("de5b426a63be47a7b6ac3eaac82e2f6f", "Unit stopping"),
4633    ("9d1aaa27d60140bd96365438aad20286", "Unit stopped"),
4634    ("d34d037fff1847e6ae669a370e694725", "Unit reloading"),
4635    ("7b05ebc668384222baa8881179cfda54", "Unit reloaded"),
4636    ("5eb03494b6584870a536b337290809b3", "Unit restart scheduled"),
4637    ("ae8f7b866b0347b9af31fe1c80b127c0", "Unit resources"),
4638    ("7ad2d189f7e94e70a38c781354912448", "Unit success"),
4639    ("0e4284a0caca4bfc81c0bb6786972673", "Unit skipped"),
4640    ("d9b373ed55a64feb8242e02dbe79a49c", "Unit failure result"),
4641    (
4642        "641257651c1b4ec9a8624d7a40a9e1e7",
4643        "Process execution failed",
4644    ),
4645    ("98e322203f7a4ed290d09fe03c09fe15", "Unit process exited"),
4646    ("0027229ca0644181a76c4e92458afa2e", "Syslog forward missed"),
4647    (
4648        "1dee0369c7fc4736b7099b38ecb46ee7",
4649        "Mount point is not empty",
4650    ),
4651    ("d989611b15e44c9dbf31e3c81256e4ed", "Unit oomd kill"),
4652    ("fe6faa94e7774663a0da52717891d8ef", "Unit out of memory"),
4653    ("b72ea4a2881545a0b50e200e55b9b06f", "Lid opened"),
4654    ("b72ea4a2881545a0b50e200e55b9b070", "Lid closed"),
4655    ("f5f416b862074b28927a48c3ba7d51ff", "System docked"),
4656    ("51e171bd585248568110144c517cca53", "System undocked"),
4657    ("b72ea4a2881545a0b50e200e55b9b071", "Power key"),
4658    ("3e0117101eb243c1b9a50db3494ab10b", "Power key long press"),
4659    ("9fa9d2c012134ec385451ffe316f97d0", "Reboot key"),
4660    ("f1c59a58c9d943668965c337caec5975", "Reboot key long press"),
4661    ("b72ea4a2881545a0b50e200e55b9b072", "Suspend key"),
4662    ("bfdaf6d312ab4007bc1fe40a15df78e8", "Suspend key long press"),
4663    ("b72ea4a2881545a0b50e200e55b9b073", "Hibernate key"),
4664    (
4665        "167836df6f7f428e98147227b2dc8945",
4666        "Hibernate key long press",
4667    ),
4668    ("c772d24e9a884cbeb9ea12625c306c01", "Invalid configuration"),
4669    (
4670        "1675d7f172174098b1108bf8c7dc8f5d",
4671        "DNSSEC validation failed",
4672    ),
4673    (
4674        "4d4408cfd0d144859184d1e65d7c8a65",
4675        "DNSSEC trust anchor revoked",
4676    ),
4677    ("36db2dfa5a9045e1bd4af5f93e1cf057", "DNSSEC turned off"),
4678    ("b61fdac612e94b9182285b998843061f", "Username unsafe"),
4679    (
4680        "1b3bb94037f04bbf81028e135a12d293",
4681        "Mount point path not suitable",
4682    ),
4683    (
4684        "010190138f494e29a0ef6669749531aa",
4685        "Device path not suitable",
4686    ),
4687    ("b480325f9c394a7b802c231e51a2752c", "Nobody user unsuitable"),
4688    (
4689        "1c0454c1bd2241e0ac6fefb4bc631433",
4690        "Systemd udev settle deprecated",
4691    ),
4692    ("7c8a41f37b764941a0e1780b1be2f037", "Time initial sync"),
4693    ("7db73c8af0d94eeb822ae04323fe6ab6", "Time initial bump"),
4694    ("9e7066279dc8403da79ce4b1a69064b2", "Shutdown scheduled"),
4695    ("249f6fb9e6e2428c96f3f0875681ffa3", "Shutdown canceled"),
4696    ("3f7d5ef3e54f4302b4f0b143bb270cab", "TPM PCR Extended"),
4697    ("f9b0be465ad540d0850ad32172d57c21", "Memory Trimmed"),
4698    ("a8fa8dacdb1d443e9503b8be367a6adb", "SysV Service Found"),
4699    (
4700        "187c62eb1e7f463bb530394f52cb090f",
4701        "Portable Service attached",
4702    ),
4703    (
4704        "76c5c754d628490d8ecba4c9d042112b",
4705        "Portable Service detached",
4706    ),
4707    (
4708        "9cf56b8baf9546cf9478783a8de42113",
4709        "systemd-networkd sysctl changed by foreign process",
4710    ),
4711    (
4712        "ad7089f928ac4f7ea00c07457d47ba8a",
4713        "SRK into TPM authorization failure",
4714    ),
4715    (
4716        "b2bcbaf5edf948e093ce50bbea0e81ec",
4717        "Secure Attention Key (SAK) was pressed",
4718    ),
4719    ("7fc63312330b479bb32e598d47cef1a8", "dbus activate no unit"),
4720    (
4721        "ee9799dab1e24d81b7bee7759a543e1b",
4722        "dbus activate masked unit",
4723    ),
4724    ("a0fa58cafd6f4f0c8d003d16ccf9e797", "dbus broker exited"),
4725    ("c8c6cde1c488439aba371a664353d9d8", "dbus dirwatch"),
4726    ("8af3357071af4153af414daae07d38e7", "dbus dispatch stats"),
4727    ("199d4300277f495f84ba4028c984214c", "dbus no sopeergroup"),
4728    (
4729        "b209c0d9d1764ab38d13b8e00d1784d6",
4730        "dbus protocol violation",
4731    ),
4732    ("6fa70fa776044fa28be7a21daf42a108", "dbus receive failed"),
4733    (
4734        "0ce0fa61d1a9433dabd67417f6b8e535",
4735        "dbus service failed open",
4736    ),
4737    ("24dc708d9e6a4226a3efe2033bb744de", "dbus service invalid"),
4738    ("f15d2347662d483ea9bcd8aa1a691d28", "dbus sighup"),
4739    (
4740        "0ce153587afa4095832d233c17a88001",
4741        "Gnome SM startup succeeded",
4742    ),
4743    (
4744        "10dd2dc188b54a5e98970f56499d1f73",
4745        "Gnome SM unrecoverable failure",
4746    ),
4747    ("f3ea493c22934e26811cd62abe8e203a", "Gnome shell started"),
4748    ("c7b39b1e006b464599465e105b361485", "Flatpak cache"),
4749    ("75ba3deb0af041a9a46272ff85d9e73e", "Flathub pulls"),
4750    ("f02bce89a54e4efab3a94a797d26204a", "Flathub pull errors"),
4751    ("dd11929c788e48bdbb6276fb5f26b08a", "Boltd starting"),
4752    ("1e6061a9fbd44501b3ccc368119f2b69", "Netdata startup"),
4753    (
4754        "ed4cdb8f1beb4ad3b57cb3cae2d162fa",
4755        "Netdata connection from child",
4756    ),
4757    (
4758        "6e2e3839067648968b646045dbf28d66",
4759        "Netdata connection to parent",
4760    ),
4761    (
4762        "9ce0cb58ab8b44df82c4bf1ad9ee22de",
4763        "Netdata alert transition",
4764    ),
4765    (
4766        "6db0018e83e34320ae2a659d78019fb7",
4767        "Netdata alert notification",
4768    ),
4769    ("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"),
4770    (
4771        "8ddaf5ba33a74078b609250db1e951f3",
4772        "Sensor state transition",
4773    ),
4774    (
4775        "ec87a56120d5431bace51e2fb8bba243",
4776        "Netdata log flood protection",
4777    ),
4778    (
4779        "acb33cb95778476baac702eb7e4e151d",
4780        "Netdata Cloud connection",
4781    ),
4782    (
4783        "d1f59606dd4d41e3b217a0cfcae8e632",
4784        "Netdata extreme cardinality",
4785    ),
4786    ("02f47d350af5449197bf7a95b605a468", "Netdata exit reason"),
4787    (
4788        "4fdf40816c124623a032b7fe73beacb8",
4789        "Netdata dynamic configuration",
4790    ),
4791];
4792
4793fn message_id_name(raw: &str) -> Option<&'static str> {
4794    MESSAGE_ID_NAMES
4795        .iter()
4796        .find_map(|(candidate, name)| (*candidate == raw).then_some(*name))
4797}
4798
4799#[cfg(test)]
4800mod tests {
4801    use super::*;
4802    use crate::ExplorerHistogramBucket;
4803    use journal_core::file::{JournalFile, JournalFileOptions, JournalWriter, MmapMut};
4804    use journal_core::repository::File as RepoFile;
4805    use std::cell::Cell;
4806    use std::collections::HashMap;
4807    use tempfile::TempDir;
4808
4809    #[derive(Default)]
4810    struct TestNetdataState {
4811        metadata: HashMap<PathBuf, NetdataJournalFileMetadata>,
4812        updates: Vec<(PathBuf, u64)>,
4813    }
4814
4815    impl NetdataFunctionState for TestNetdataState {
4816        fn file_metadata(&self, path: &Path) -> Option<NetdataJournalFileMetadata> {
4817            self.metadata.get(path).cloned()
4818        }
4819
4820        fn update_file_journal_vs_realtime_delta_usec(&mut self, path: &Path, delta_usec: u64) {
4821            self.updates.push((path.to_path_buf(), delta_usec));
4822        }
4823    }
4824
4825    fn test_uuid(seed: u8) -> uuid::Uuid {
4826        uuid::Uuid::from_bytes([seed; 16])
4827    }
4828
4829    fn write_netdata_test_journal(directory: &std::path::Path, count: usize) {
4830        write_named_netdata_test_journal(
4831            directory,
4832            "netdata-api-test.journal",
4833            count,
4834            1_700_000_000_000_000,
4835        );
4836    }
4837
4838    fn write_named_netdata_test_journal(
4839        directory: &std::path::Path,
4840        name: &str,
4841        count: usize,
4842        start_realtime_usec: u64,
4843    ) {
4844        write_stepped_netdata_test_journal(directory, name, count, start_realtime_usec, 1);
4845    }
4846
4847    fn write_stepped_netdata_test_journal(
4848        directory: &std::path::Path,
4849        name: &str,
4850        count: usize,
4851        start_realtime_usec: u64,
4852        step_realtime_usec: u64,
4853    ) {
4854        std::fs::create_dir_all(directory).expect("create journal dir");
4855        let path = directory.join(name);
4856        let repo_file = RepoFile::from_path(&path).expect("repo file");
4857        let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
4858        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
4859        let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
4860        for index in 0..count {
4861            let message = format!("MESSAGE=row-{index}");
4862            let service = if index % 2 == 0 {
4863                b"SERVICE=even".as_slice()
4864            } else {
4865                b"SERVICE=odd".as_slice()
4866            };
4867            let payloads: [&[u8]; 3] = [message.as_bytes(), service, b"PRIORITY=6"];
4868            let realtime = start_realtime_usec
4869                .saturating_add((index as u64).saturating_mul(step_realtime_usec));
4870            writer
4871                .add_entry(&mut file, &payloads, realtime, realtime)
4872                .expect("write entry");
4873        }
4874        file.sync().expect("sync journal");
4875    }
4876
4877    struct NetdataCollectedPages {
4878        messages: Vec<String>,
4879        timestamps: Vec<u64>,
4880    }
4881
4882    fn collect_netdata_pages(
4883        directory: &std::path::Path,
4884        direction: Direction,
4885        page_size: usize,
4886    ) -> NetdataCollectedPages {
4887        let mut out = NetdataCollectedPages {
4888            messages: Vec::new(),
4889            timestamps: Vec::new(),
4890        };
4891        let mut anchor = None;
4892        for page in 0..100 {
4893            let mut request = json!({
4894                "after": 1_700_000_000,
4895                "before": 1_800_000_000,
4896                "last": page_size,
4897                "direction": direction_name(direction),
4898                "data_only": true,
4899            });
4900            if let Some(anchor) = anchor {
4901                request["anchor"] = Value::from(anchor);
4902            }
4903            let response = run_netdata_contract_request(directory, request);
4904            assert_eq!(
4905                response["status"], 200,
4906                "page {page} response = {response:#}"
4907            );
4908            let messages = response_column_strings(&response, "MESSAGE");
4909            let timestamps = response_column_u64s(&response, "timestamp");
4910            assert_eq!(messages.len(), timestamps.len());
4911            if messages.is_empty() {
4912                break;
4913            }
4914            out.messages.extend(messages);
4915            out.timestamps.extend(timestamps.iter().copied());
4916            anchor = Some(match direction {
4917                Direction::Forward => timestamps[0],
4918                Direction::Backward => *timestamps.last().expect("page timestamp"),
4919            });
4920            if timestamps.len() < page_size {
4921                break;
4922            }
4923        }
4924        assert!(!out.messages.is_empty(), "pagination returned no rows");
4925        out
4926    }
4927
4928    fn run_netdata_contract_request(directory: &std::path::Path, request: Value) -> Value {
4929        NetdataJournalFunction::systemd_journal_plugin_compatible()
4930            .run_directory_request_json_with_options(
4931                directory,
4932                &request,
4933                NetdataFunctionRunOptions::from_timeout_seconds(0),
4934            )
4935            .expect("run function")
4936    }
4937
4938    fn direction_name(direction: Direction) -> &'static str {
4939        match direction {
4940            Direction::Forward => "forward",
4941            Direction::Backward => "backward",
4942        }
4943    }
4944
4945    fn response_column_strings(response: &Value, field: &str) -> Vec<String> {
4946        let index = response_column_index(response, field);
4947        response["data"]
4948            .as_array()
4949            .expect("data")
4950            .iter()
4951            .map(|row| {
4952                row.as_array().expect("row")[index]
4953                    .as_str()
4954                    .expect("string cell")
4955                    .to_string()
4956            })
4957            .collect()
4958    }
4959
4960    fn response_column_u64s(response: &Value, field: &str) -> Vec<u64> {
4961        let index = response_column_index(response, field);
4962        response["data"]
4963            .as_array()
4964            .expect("data")
4965            .iter()
4966            .map(|row| {
4967                row.as_array().expect("row")[index]
4968                    .as_u64()
4969                    .expect("u64 cell")
4970            })
4971            .collect()
4972    }
4973
4974    fn response_column_index(response: &Value, field: &str) -> usize {
4975        response["columns"][field]["index"]
4976            .as_u64()
4977            .expect("column index") as usize
4978    }
4979
4980    fn response_facet_count(response: &Value, key: &str, field: &str, value: &str) -> u64 {
4981        for facet in response[key].as_array().expect("facets") {
4982            if facet["id"] != field {
4983                continue;
4984            }
4985            for option in facet["options"].as_array().expect("facet options") {
4986                if option["id"] == value {
4987                    return option["count"].as_u64().expect("facet count");
4988                }
4989            }
4990            panic!("{key} facet {field} missing value {value}");
4991        }
4992        panic!("{key} missing facet {field}");
4993    }
4994
4995    fn response_histogram_total(response: &Value, key: &str, value: &str) -> u64 {
4996        let chart = &response[key]["chart"];
4997        let names = chart["view"]["dimensions"]["names"]
4998            .as_array()
4999            .expect("histogram names");
5000        let dimension_index = names
5001            .iter()
5002            .position(|name| name.as_str() == Some(value))
5003            .expect("histogram dimension");
5004        chart["result"]["data"]
5005            .as_array()
5006            .expect("histogram data")
5007            .iter()
5008            .map(|point| {
5009                point.as_array().expect("histogram point")[dimension_index + 1]
5010                    .as_array()
5011                    .expect("histogram cell")[0]
5012                    .as_u64()
5013                    .unwrap_or_default()
5014            })
5015            .sum()
5016    }
5017
5018    fn assert_unique_messages(messages: &[String]) {
5019        let mut seen = HashSet::new();
5020        for message in messages {
5021            assert!(
5022                seen.insert(message),
5023                "duplicate message {message} in {messages:?}"
5024            );
5025        }
5026    }
5027
5028    #[test]
5029    fn parses_netdata_selections_as_and_fields_or_values() {
5030        let request = json!({
5031            "after": 200_000_000,
5032            "before": 200_000_100,
5033            "direction": "forward",
5034            "last": 25,
5035            "facets": ["PRIORITY"],
5036            "selections": {
5037                "PRIORITY": ["warning", "error"],
5038                "_HOSTNAME": ["node-a"],
5039                "__logs_sources": ["all-local-system-logs"],
5040            }
5041        });
5042
5043        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5044            .expect("parse request");
5045        assert_eq!(parsed.after_realtime_usec, Some(200_000_000_000_000));
5046        assert_eq!(parsed.before_realtime_usec, Some(200_000_100_999_999));
5047        assert_eq!(parsed.direction, Direction::Forward);
5048        assert_eq!(parsed.limit, 25);
5049        assert_eq!(parsed.filters.len(), 2);
5050        assert_eq!(parsed.filters[0].field, b"PRIORITY");
5051        assert_eq!(parsed.filters[0].values, vec![b"4".to_vec(), b"3".to_vec()]);
5052        assert_eq!(parsed.filters[1].field, b"_HOSTNAME");
5053        assert_eq!(parsed.filters[1].values, vec![b"node-a".to_vec()]);
5054    }
5055
5056    #[test]
5057    fn netdata_last_one_keeps_echo_and_uses_effective_minimum_two() {
5058        let request = json!({
5059            "after": 200_000_000,
5060            "before": 200_000_100,
5061            "last": 1
5062        });
5063
5064        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5065            .expect("parse request");
5066        let query =
5067            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5068
5069        assert_eq!(parsed.echo.get("last").and_then(Value::as_u64), Some(1));
5070        assert_eq!(parsed.limit, 2);
5071        assert_eq!(query.limit, 2);
5072    }
5073
5074    #[test]
5075    fn netdata_facet_counts_use_native_sliced_filter_semantics() {
5076        let request = json!({
5077            "after": 200_000_000,
5078            "before": 200_000_100,
5079            "facets": ["PRIORITY"],
5080            "selections": {
5081                "PRIORITY": ["warning"]
5082            }
5083        });
5084
5085        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5086            .expect("parse request");
5087        let query =
5088            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5089
5090        assert!(!query.exclude_facet_field_filters);
5091    }
5092
5093    #[test]
5094    fn netdata_multi_filter_facet_counts_exclude_same_field_filter() {
5095        let request = json!({
5096            "after": 200_000_000,
5097            "before": 200_000_100,
5098            "facets": ["PRIORITY", "_BOOT_ID"],
5099            "selections": {
5100                "PRIORITY": ["warning"],
5101                "_BOOT_ID": ["738043aea7b3417cbc3e9941ad26f769"]
5102            }
5103        });
5104
5105        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5106            .expect("parse request");
5107        let query =
5108            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5109
5110        assert!(query.exclude_facet_field_filters);
5111    }
5112
5113    #[test]
5114    fn parses_netdata_fts_query_like_simple_pattern() {
5115        let (terms, positives, negatives) =
5116            parse_fts_query_patterns(r"error|warning|!debug|escaped\|pipe|\!literal| a*B");
5117
5118        assert_eq!(
5119            positives,
5120            vec![
5121                b"error".to_vec(),
5122                b"warning".to_vec(),
5123                b"escaped|pipe".to_vec(),
5124                b"!literal".to_vec(),
5125                b" a*B".to_vec(),
5126            ]
5127        );
5128        assert_eq!(negatives, vec![b"debug".to_vec()]);
5129        assert_eq!(terms.len(), 6);
5130        assert!(!terms[0].negative);
5131        assert!(terms[2].negative);
5132        assert_eq!(
5133            terms[5],
5134            ExplorerFtsPattern {
5135                parts: vec![b" a".to_vec(), b"B".to_vec()],
5136                negative: false,
5137            }
5138        );
5139
5140        let request = json!({
5141            "query": r"alpha|!debug|needle\|pipe",
5142            "facets": ["PRIORITY"],
5143        });
5144        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5145            .expect("parse request");
5146        let query =
5147            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5148        assert_eq!(
5149            query.fts_patterns,
5150            vec![b"alpha".to_vec(), b"needle|pipe".to_vec()]
5151        );
5152        assert_eq!(query.fts_negative_patterns, vec![b"debug".to_vec()]);
5153        assert_eq!(query.fts_terms.len(), 3);
5154    }
5155
5156    #[test]
5157    fn netdata_requests_never_enable_debug_row_traversal_column_collection() {
5158        let request = json!({
5159            "facets": ["PRIORITY", "_HOSTNAME"],
5160            "histogram": "PRIORITY",
5161            "last": 25
5162        });
5163
5164        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5165            .expect("parse request");
5166        let query =
5167            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5168
5169        assert!(!query.debug_collect_column_fields_by_row_traversal);
5170    }
5171
5172    #[test]
5173    fn netdata_function_reports_requested_facet_groups_even_when_absent() {
5174        let dir = TempDir::new().expect("tempdir");
5175        write_netdata_test_journal(dir.path(), 10);
5176        let request = json!({
5177            "after": 1_700_000_000,
5178            "before": 1_700_000_010,
5179            "facets": ["SERVICE", "MISSING_FIELD"],
5180            "histogram": "SERVICE",
5181            "last": 5,
5182            "slice": true
5183        });
5184        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5185
5186        let response = function
5187            .run_directory_request_json_with_options(
5188                dir.path(),
5189                &request,
5190                NetdataFunctionRunOptions::from_timeout_seconds(0),
5191            )
5192            .expect("run function");
5193
5194        let columns = response["columns"].as_object().expect("columns");
5195        assert!(columns.contains_key("SERVICE"));
5196        assert!(columns.contains_key("MISSING_FIELD"));
5197        let facets = response["facets"].as_array().expect("facets");
5198        assert_eq!(
5199            facets
5200                .iter()
5201                .filter_map(|facet| facet["id"].as_str())
5202                .collect::<Vec<_>>(),
5203            vec!["SERVICE", "MISSING_FIELD"]
5204        );
5205        assert_eq!(
5206            facets[1]["options"].as_array().expect("missing options"),
5207            &Vec::<Value>::new()
5208        );
5209        let accepted = response["accepted_params"]
5210            .as_array()
5211            .expect("accepted params");
5212        assert!(accepted.iter().any(|value| value == "SERVICE"));
5213        assert!(accepted.iter().any(|value| value == "MISSING_FIELD"));
5214        let histograms = response["available_histograms"]
5215            .as_array()
5216            .expect("available histograms");
5217        assert!(histograms.iter().any(|value| value["id"] == "SERVICE"));
5218        assert!(
5219            histograms
5220                .iter()
5221                .any(|value| value["id"] == "MISSING_FIELD")
5222        );
5223    }
5224
5225    #[test]
5226    fn netdata_function_reports_zero_count_existing_facets_for_empty_results() {
5227        let dir = TempDir::new().expect("tempdir");
5228        write_netdata_test_journal(dir.path(), 10);
5229        let request = json!({
5230            "after": 1_700_000_000,
5231            "before": 1_700_000_010,
5232            "facets": ["PRIORITY"],
5233            "histogram": "PRIORITY",
5234            "selections": {
5235                "SERVICE": ["missing"]
5236            },
5237            "last": 5,
5238            "slice": true
5239        });
5240        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5241
5242        let response = function
5243            .run_directory_request_json_with_options(
5244                dir.path(),
5245                &request,
5246                NetdataFunctionRunOptions::from_timeout_seconds(0),
5247            )
5248            .expect("run function");
5249
5250        let facets = response["facets"].as_array().expect("facets");
5251        assert_eq!(facets.len(), 1);
5252        assert_eq!(facets[0]["id"], "PRIORITY");
5253        let options = facets[0]["options"].as_array().expect("options");
5254        assert!(options.iter().any(|option| {
5255            option["id"] == "6" && option["name"] == "info" && option["count"] == 0
5256        }));
5257        assert_eq!(response["items"]["matched"], 0);
5258    }
5259
5260    #[test]
5261    fn netdata_function_api_reports_progress() {
5262        let dir = TempDir::new().expect("tempdir");
5263        write_netdata_test_journal(dir.path(), 9_000);
5264        let request = json!({
5265            "after": 1_700_000_000,
5266            "before": 1_700_000_010,
5267            "facets": ["SERVICE"],
5268            "histogram": "SERVICE",
5269            "last": 0
5270        });
5271        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5272        let mut reports = 0u64;
5273        let mut progress = |progress: NetdataFunctionProgress| {
5274            reports = reports.saturating_add(1);
5275            assert_eq!(progress.current_file, 1);
5276            assert_eq!(progress.total_files, 1);
5277            assert!(progress.stats.rows_examined <= 9_000);
5278        };
5279        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5280        options.progress_interval = Duration::ZERO;
5281        options.progress_callback = Some(&mut progress);
5282
5283        let response = function
5284            .run_directory_request_json_with_options(dir.path(), &request, options)
5285            .expect("run function");
5286
5287        assert_eq!(response["status"], 200);
5288        assert!(reports > 0);
5289        assert_eq!(response["last_modified"], 1_700_000_000_008_999u64);
5290    }
5291
5292    #[test]
5293    fn netdata_function_api_reports_file_end_progress_for_small_scans() {
5294        let dir = TempDir::new().expect("tempdir");
5295        write_netdata_test_journal(dir.path(), 10);
5296        let request = json!({
5297            "after": 1_700_000_000,
5298            "before": 1_700_000_010,
5299            "facets": ["SERVICE"],
5300            "histogram": "SERVICE",
5301            "last": 0
5302        });
5303        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5304        let mut reports = 0u64;
5305        let mut last_rows_examined = 0u64;
5306        let mut progress = |progress: NetdataFunctionProgress| {
5307            reports = reports.saturating_add(1);
5308            last_rows_examined = progress.stats.rows_examined;
5309        };
5310        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5311        options.progress_callback = Some(&mut progress);
5312
5313        let response = function
5314            .run_directory_request_json_with_options(dir.path(), &request, options)
5315            .expect("run function");
5316
5317        assert_eq!(response["status"], 200);
5318        assert_eq!(reports, 1);
5319        assert_eq!(last_rows_examined, 10);
5320    }
5321
5322    #[test]
5323    fn netdata_function_progress_counts_only_query_files() {
5324        let dir = TempDir::new().expect("tempdir");
5325        write_named_netdata_test_journal(
5326            dir.path(),
5327            "old-window.journal",
5328            10,
5329            1_600_000_000_000_000,
5330        );
5331        write_named_netdata_test_journal(
5332            dir.path(),
5333            "current-window.journal",
5334            10,
5335            1_700_000_000_000_000,
5336        );
5337        let request = json!({
5338            "after": 1_700_000_000,
5339            "before": 1_700_000_010,
5340            "facets": ["SERVICE"],
5341            "histogram": "SERVICE",
5342            "last": 0
5343        });
5344        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5345        let mut reports = Vec::new();
5346        let mut progress = |progress: NetdataFunctionProgress| {
5347            reports.push((progress.current_file, progress.total_files));
5348        };
5349        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5350        options.progress_callback = Some(&mut progress);
5351
5352        let response = function
5353            .run_directory_request_json_with_options(dir.path(), &request, options)
5354            .expect("run function");
5355
5356        assert_eq!(response["status"], 200);
5357        assert_eq!(response["_journal_files"]["matched"], 1);
5358        assert_eq!(reports, vec![(1, 1)]);
5359    }
5360
5361    #[test]
5362    fn netdata_function_api_reports_cancellation() {
5363        let dir = TempDir::new().expect("tempdir");
5364        write_netdata_test_journal(dir.path(), 9_000);
5365        let request = json!({
5366            "after": 1_700_000_000,
5367            "before": 1_700_000_010,
5368            "facets": ["SERVICE"],
5369            "histogram": "SERVICE",
5370            "last": 0
5371        });
5372        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5373        let is_cancelled = || true;
5374        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5375        options.cancellation_callback = Some(&is_cancelled);
5376
5377        let response = function
5378            .run_directory_request_json_with_options(dir.path(), &request, options)
5379            .expect("run function");
5380
5381        assert_eq!(response["status"], 499);
5382        assert_eq!(response["errorMessage"], "Request cancelled.");
5383        assert_eq!(
5384            response.as_object().expect("response object").len(),
5385            2,
5386            "plugin-compatible function errors only include status and errorMessage"
5387        );
5388    }
5389
5390    #[test]
5391    fn netdata_function_api_cancels_during_active_scan() {
5392        let dir = TempDir::new().expect("tempdir");
5393        write_netdata_test_journal(dir.path(), 9_000);
5394        let request = json!({
5395            "after": 1_700_000_000,
5396            "before": 1_700_000_010,
5397            "facets": ["SERVICE"],
5398            "histogram": "SERVICE",
5399            "last": 0
5400        });
5401        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5402        let should_cancel = Cell::new(false);
5403        let mut reports = 0u64;
5404        let mut progress = |progress: NetdataFunctionProgress| {
5405            reports = reports.saturating_add(1);
5406            if progress.stats.rows_examined > 0 {
5407                should_cancel.set(true);
5408            }
5409        };
5410        let is_cancelled = || should_cancel.get();
5411        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5412        options.progress_interval = Duration::ZERO;
5413        options.progress_callback = Some(&mut progress);
5414        options.cancellation_callback = Some(&is_cancelled);
5415
5416        let response = function
5417            .run_directory_request_json_with_options(dir.path(), &request, options)
5418            .expect("run function");
5419
5420        assert_eq!(response["status"], 499);
5421        assert_eq!(response["errorMessage"], "Request cancelled.");
5422        assert!(reports > 0);
5423        assert!(should_cancel.get());
5424    }
5425
5426    #[test]
5427    fn netdata_function_api_honors_cancellation_after_final_file_progress() {
5428        let dir = TempDir::new().expect("tempdir");
5429        write_netdata_test_journal(dir.path(), 10);
5430        let request = json!({
5431            "after": 1_700_000_000,
5432            "before": 1_700_000_010,
5433            "facets": ["SERVICE"],
5434            "histogram": "SERVICE",
5435            "last": 0
5436        });
5437        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5438        let should_cancel = Cell::new(false);
5439        let mut progress = |_progress: NetdataFunctionProgress| {
5440            should_cancel.set(true);
5441        };
5442        let is_cancelled = || should_cancel.get();
5443        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5444        options.progress_callback = Some(&mut progress);
5445        options.cancellation_callback = Some(&is_cancelled);
5446
5447        let response = function
5448            .run_directory_request_json_with_options(dir.path(), &request, options)
5449            .expect("run function");
5450
5451        assert_eq!(response["status"], 499);
5452        assert_eq!(response["errorMessage"], "Request cancelled.");
5453        assert!(should_cancel.get());
5454    }
5455
5456    #[test]
5457    fn netdata_function_api_reports_timeout_as_partial_table() {
5458        let dir = TempDir::new().expect("tempdir");
5459        write_netdata_test_journal(dir.path(), 10);
5460        let request = json!({
5461            "after": 1_700_000_000,
5462            "before": 1_700_000_010,
5463            "facets": ["SERVICE"],
5464            "histogram": "SERVICE",
5465            "last": 0
5466        });
5467        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5468        let options = NetdataFunctionRunOptions {
5469            timeout: Some(Duration::ZERO),
5470            ..NetdataFunctionRunOptions::default()
5471        };
5472
5473        let response = function
5474            .run_directory_request_json_with_options(dir.path(), &request, options)
5475            .expect("run function");
5476
5477        assert_eq!(response["status"], 200);
5478        assert_eq!(response["partial"], true);
5479        assert_eq!(response["message"]["status"], "warning");
5480        assert_eq!(
5481            response["message"]["title"],
5482            "Query timed-out, incomplete data. "
5483        );
5484    }
5485
5486    #[test]
5487    fn netdata_function_api_reports_sampling_counters() {
5488        let dir = TempDir::new().expect("tempdir");
5489        write_stepped_netdata_test_journal(
5490            dir.path(),
5491            "netdata-api-test.journal",
5492            5_000,
5493            1_700_000_000_000_000,
5494            1_000,
5495        );
5496        let request = json!({
5497            "after": 1_700_000_000,
5498            "before": 1_700_000_005,
5499            "facets": ["SERVICE"],
5500            "histogram": "SERVICE",
5501            "last": 5,
5502            "sampling": 20
5503        });
5504        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5505
5506        let response = function
5507            .run_directory_request_json_with_options(
5508                dir.path(),
5509                &request,
5510                NetdataFunctionRunOptions::from_timeout_seconds(0),
5511            )
5512            .expect("run function");
5513
5514        assert_eq!(response["status"], 200);
5515        assert!(
5516            response["_sampling"]["sampled"]
5517                .as_u64()
5518                .unwrap_or_default()
5519                > 0
5520        );
5521        assert!(
5522            response["_sampling"]["unsampled"]
5523                .as_u64()
5524                .unwrap_or_default()
5525                > 0
5526        );
5527        assert!(
5528            response["_sampling"]["estimated"]
5529                .as_u64()
5530                .unwrap_or_default()
5531                > 0
5532        );
5533        assert_eq!(
5534            response["items"]["estimated"],
5535            response["_sampling"]["estimated"]
5536        );
5537        assert!(
5538            response["items"]["unsampled"].as_u64().unwrap_or_default()
5539                < response["_sampling"]["unsampled"]
5540                    .as_u64()
5541                    .unwrap_or_default()
5542        );
5543        assert_eq!(response["message"]["status"], "notice");
5544    }
5545
5546    #[test]
5547    fn netdata_function_api_disables_sampling_for_data_only() {
5548        let dir = TempDir::new().expect("tempdir");
5549        write_netdata_test_journal(dir.path(), 5_000);
5550        let request = json!({
5551            "after": 1_700_000_000,
5552            "before": 1_700_000_010,
5553            "data_only": true,
5554            "last": 5,
5555            "sampling": 20
5556        });
5557        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5558
5559        let response = function
5560            .run_directory_request_json_with_options(
5561                dir.path(),
5562                &request,
5563                NetdataFunctionRunOptions::from_timeout_seconds(0),
5564            )
5565            .expect("run function");
5566
5567        assert_eq!(response["status"], 200);
5568        assert!(response.get("_sampling").is_none());
5569    }
5570
5571    #[test]
5572    fn normalizes_missing_time_window_to_last_hour_like_plugin() {
5573        assert_eq!(
5574            normalize_time_window(1_000_000_000, None, None),
5575            (Some(999_996_400_000_000), Some(1_000_000_000_999_999))
5576        );
5577    }
5578
5579    #[test]
5580    fn normalizes_inverted_time_window_like_plugin() {
5581        assert_eq!(
5582            normalize_time_window(1_000_000_000, Some(200_000_100), Some(200_000_000)),
5583            (Some(200_000_000_000_000), Some(200_000_100_999_999))
5584        );
5585    }
5586
5587    #[test]
5588    fn normalizes_equal_time_window_like_plugin() {
5589        assert_eq!(
5590            normalize_time_window(1_000_000_000, Some(200_000_000), Some(200_000_000)),
5591            (Some(199_996_400_000_000), Some(200_000_000_999_999))
5592        );
5593    }
5594
5595    #[test]
5596    fn normalizes_relative_time_window_like_plugin() {
5597        assert_eq!(
5598            normalize_time_window(1_000_000_000, Some(100), Some(200)),
5599            (Some(999_999_701_000_000), Some(999_999_800_999_999))
5600        );
5601    }
5602
5603    #[test]
5604    fn normalizes_missing_after_with_supplied_before_like_plugin() {
5605        assert_eq!(
5606            normalize_time_window(1_000_000_000, None, Some(200_000_000)),
5607            (Some(199_999_401_000_000), Some(200_000_000_999_999))
5608        );
5609    }
5610
5611    #[test]
5612    fn systemd_profile_transforms_priority_and_facility_for_display() {
5613        let profile = SystemdJournalProfile;
5614        let context = DisplayContext::default();
5615        assert_eq!(
5616            profile.field_display_value(&context, DisplayScope::Data, "PRIORITY", b"7"),
5617            json!("debug")
5618        );
5619        assert_eq!(
5620            profile.field_display_value(&context, DisplayScope::Data, "SYSLOG_FACILITY", b"3"),
5621            json!("daemon")
5622        );
5623        assert_eq!(priority_to_row_severity(b"3"), "critical");
5624        assert_eq!(priority_to_row_severity(b"6"), "normal");
5625    }
5626
5627    #[test]
5628    fn dynamic_process_name_matches_plugin_fallback_order() {
5629        let mut fields = BTreeMap::new();
5630        fields.insert("SYSLOG_IDENTIFIER".to_string(), vec![b"syslog".to_vec()]);
5631        fields.insert("_COMM".to_string(), vec![b"comm".to_vec()]);
5632        fields.insert("_PID".to_string(), vec![b"42".to_vec()]);
5633        fields.insert("SYSLOG_PID".to_string(), vec![b"99".to_vec()]);
5634        assert_eq!(dynamic_process_name(&fields), "syslog[42]");
5635
5636        fields.insert("CONTAINER_NAME".to_string(), vec![b"container".to_vec()]);
5637        assert_eq!(dynamic_process_name(&fields), "container[42]");
5638
5639        fields.remove("CONTAINER_NAME");
5640        fields.remove("SYSLOG_IDENTIFIER");
5641        fields.remove("_PID");
5642        assert_eq!(dynamic_process_name(&fields), "comm[-]");
5643
5644        fields.insert("_PID".to_string(), vec![Vec::new()]);
5645        assert_eq!(dynamic_process_name(&fields), "comm");
5646
5647        fields.remove("_COMM");
5648        fields.remove("_PID");
5649        fields.insert("_EXE".to_string(), vec![b"/usr/bin/app".to_vec()]);
5650        assert_eq!(dynamic_process_name(&fields), "-");
5651    }
5652
5653    #[test]
5654    fn facet_values_are_truncated_and_collapsed_like_plugin() {
5655        let prefix = vec![b'a'; NETDATA_FACET_MAX_VALUE_LENGTH];
5656        let mut first = prefix.clone();
5657        first.extend_from_slice(b"-first");
5658        let mut second = prefix.clone();
5659        second.extend_from_slice(b"-second");
5660
5661        let mut values = BTreeMap::new();
5662        add_netdata_facet_count(&mut values, &first, 2);
5663        add_netdata_facet_count(&mut values, &second, 3);
5664
5665        assert_eq!(values.len(), 1);
5666        assert_eq!(values.get(&prefix), Some(&5));
5667    }
5668
5669    #[test]
5670    fn histogram_values_are_truncated_and_collapsed_like_plugin() {
5671        let prefix = vec![b'b'; NETDATA_FACET_MAX_VALUE_LENGTH];
5672        let mut first = prefix.clone();
5673        first.extend_from_slice(b"-first");
5674        let mut second = prefix.clone();
5675        second.extend_from_slice(b"-second");
5676
5677        let mut values = HashMap::new();
5678        values.insert(first, 2);
5679        values.insert(second, 3);
5680        let histogram = ExplorerHistogram {
5681            field: b"TEST_FIELD".to_vec(),
5682            buckets: vec![ExplorerHistogramBucket {
5683                start_realtime_usec: 1_000_000,
5684                end_realtime_usec: 2_000_000,
5685                values,
5686            }],
5687        };
5688
5689        let function = NetdataJournalFunction::systemd_journal();
5690        let rendered = function.build_histogram(&DisplayContext::default(), &histogram, None);
5691        let labels = rendered["chart"]["result"]["labels"]
5692            .as_array()
5693            .expect("labels");
5694        assert_eq!(labels.len(), 2);
5695        assert_eq!(labels[1], Value::String(String::from_utf8(prefix).unwrap()));
5696        assert_eq!(rendered["chart"]["result"]["data"][0][1][0], json!(5));
5697    }
5698
5699    #[test]
5700    fn histogram_chart_metadata_includes_empty_dimension_arrays() {
5701        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5702        let empty = function.build_histogram(
5703            &DisplayContext::default(),
5704            &ExplorerHistogram {
5705                field: b"TRAP_SEVERITY".to_vec(),
5706                buckets: vec![ExplorerHistogramBucket {
5707                    start_realtime_usec: 1_700_000_000_000_000,
5708                    end_realtime_usec: 1_700_000_005_000_000,
5709                    values: HashMap::new(),
5710                }],
5711            },
5712            None,
5713        );
5714
5715        assert_eq!(empty["chart"]["view"]["dimensions"]["names"], json!([]));
5716        assert_eq!(empty["chart"]["view"]["dimensions"]["ids"], json!([]));
5717        assert_eq!(empty["chart"]["db"]["dimensions"]["names"], json!([]));
5718
5719        let mut values = HashMap::new();
5720        values.insert(b"warning".to_vec(), 7);
5721        let with_value = function.build_histogram(
5722            &DisplayContext::default(),
5723            &ExplorerHistogram {
5724                field: b"TRAP_SEVERITY".to_vec(),
5725                buckets: vec![ExplorerHistogramBucket {
5726                    start_realtime_usec: 1_700_000_000_000_000,
5727                    end_realtime_usec: 1_700_000_005_000_000,
5728                    values,
5729                }],
5730            },
5731            None,
5732        );
5733
5734        assert_eq!(
5735            with_value["chart"]["view"]["dimensions"]["names"],
5736            json!(["warning"])
5737        );
5738        assert_eq!(
5739            with_value["chart"]["view"]["dimensions"]["sts"]["min"],
5740            json!([7])
5741        );
5742    }
5743
5744    #[test]
5745    fn duplicate_row_timestamps_match_plugin_direction_adjustment() {
5746        let mut backward = vec![
5747            test_located_row(100),
5748            test_located_row(100),
5749            test_located_row(100),
5750            test_located_row(90),
5751        ];
5752        make_row_timestamps_unique(&mut backward, Direction::Backward);
5753        assert_eq!(
5754            backward
5755                .iter()
5756                .map(|row| row.row.realtime_usec)
5757                .collect::<Vec<_>>(),
5758            vec![100, 99, 98, 90]
5759        );
5760
5761        let mut forward = vec![
5762            test_located_row(90),
5763            test_located_row(100),
5764            test_located_row(100),
5765            test_located_row(100),
5766        ];
5767        make_row_timestamps_unique(&mut forward, Direction::Forward);
5768        assert_eq!(
5769            forward
5770                .iter()
5771                .map(|row| row.row.realtime_usec)
5772                .collect::<Vec<_>>(),
5773            vec![90, 100, 101, 102]
5774        );
5775    }
5776
5777    #[test]
5778    fn page_window_counts_forward_anchor_like_netdata_facets() {
5779        let config = NetdataFunctionConfig::systemd_journal();
5780        let request = NetdataRequest::parse(
5781            &json!({
5782                "after": 1_700_000_000,
5783                "before": 1_700_000_010,
5784                "anchor": 1_700_000_005_000_000u64,
5785                "direction": "forward",
5786                "last": 2
5787            }),
5788            &config,
5789        )
5790        .expect("parse request");
5791        let mut window = NetdataPageWindow::for_request(&request);
5792
5793        for realtime_usec in [
5794            1_700_000_003_000_000,
5795            1_700_000_004_000_000,
5796            1_700_000_006_000_000,
5797            1_700_000_007_000_000,
5798            1_700_000_008_000_000,
5799        ] {
5800            window.observe(realtime_usec);
5801        }
5802
5803        let counters = window.counters();
5804        assert_eq!(counters.matched, 3);
5805        assert_eq!(counters.before, 1);
5806        assert_eq!(counters.after, 2);
5807    }
5808
5809    #[test]
5810    fn page_window_counts_backward_anchor_like_netdata_facets() {
5811        let config = NetdataFunctionConfig::systemd_journal();
5812        let request = NetdataRequest::parse(
5813            &json!({
5814                "after": 1_700_000_000,
5815                "before": 1_700_000_010,
5816                "anchor": 1_700_000_008_000_000u64,
5817                "direction": "backward",
5818                "last": 2
5819            }),
5820            &config,
5821        )
5822        .expect("parse request");
5823        let mut window = NetdataPageWindow::for_request(&request);
5824
5825        for realtime_usec in [
5826            1_700_000_009_000_000,
5827            1_700_000_007_000_000,
5828            1_700_000_006_000_000,
5829            1_700_000_005_000_000,
5830        ] {
5831            window.observe(realtime_usec);
5832        }
5833
5834        let counters = window.counters();
5835        assert_eq!(counters.matched, 3);
5836        assert_eq!(counters.before, 1);
5837        assert_eq!(counters.after, 1);
5838    }
5839
5840    #[test]
5841    fn page_window_counts_tail_anchor_like_netdata_facets() {
5842        let config = NetdataFunctionConfig::systemd_journal();
5843        let request = NetdataRequest::parse(
5844            &json!({
5845                "after": 1_700_000_000,
5846                "before": 1_700_000_010,
5847                "anchor": 1_700_000_008_000_000u64,
5848                "if_modified_since": 1_700_000_008_000_000u64,
5849                "data_only": true,
5850                "tail": true,
5851                "last": 2
5852            }),
5853            &config,
5854        )
5855        .expect("parse request");
5856        let mut window = NetdataPageWindow::for_request(&request);
5857
5858        for realtime_usec in [
5859            1_700_000_009_000_000,
5860            1_700_000_008_000_000,
5861            1_700_000_007_000_000,
5862        ] {
5863            window.observe(realtime_usec);
5864        }
5865
5866        let counters = window.counters();
5867        assert_eq!(counters.matched, 1);
5868        assert_eq!(counters.before, 0);
5869        assert_eq!(counters.after, 2);
5870    }
5871
5872    #[test]
5873    fn netdata_function_tail_anchor_with_newer_filtered_out_rows_returns_empty_200() {
5874        let dir = TempDir::new().expect("tempdir");
5875        let start_realtime_usec = 1_700_000_000_000_000;
5876        write_stepped_netdata_test_journal(
5877            dir.path(),
5878            "netdata-api-test.journal",
5879            2,
5880            start_realtime_usec,
5881            1_000_000,
5882        );
5883        let request = json!({
5884            "after": 1_700_000_000,
5885            "before": 1_700_000_002,
5886            "anchor": start_realtime_usec,
5887            "if_modified_since": start_realtime_usec,
5888            "data_only": true,
5889            "tail": true,
5890            "last": 5,
5891            "selections": {
5892                "SERVICE": ["even"]
5893            }
5894        });
5895        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5896
5897        let response = function
5898            .run_directory_request_json_with_options(
5899                dir.path(),
5900                &request,
5901                NetdataFunctionRunOptions::from_timeout_seconds(0),
5902            )
5903            .expect("run function");
5904
5905        assert_eq!(response["status"], 200);
5906        assert_eq!(
5907            response_column_strings(&response, "MESSAGE"),
5908            Vec::<String>::new()
5909        );
5910    }
5911
5912    #[test]
5913    fn netdata_function_pages_with_anchor_without_duplicate_or_missing_rows() {
5914        let dir = TempDir::new().expect("tempdir");
5915        let start_realtime_usec = 1_700_000_000_000_000;
5916        write_stepped_netdata_test_journal(
5917            dir.path(),
5918            "paging-contract.journal",
5919            7,
5920            start_realtime_usec,
5921            1,
5922        );
5923
5924        let backward = collect_netdata_pages(dir.path(), Direction::Backward, 2);
5925        assert_eq!(
5926            backward.messages,
5927            vec![
5928                "row-6", "row-5", "row-4", "row-3", "row-2", "row-1", "row-0"
5929            ]
5930        );
5931        assert_unique_messages(&backward.messages);
5932        assert_eq!(
5933            backward.timestamps,
5934            vec![
5935                start_realtime_usec + 6,
5936                start_realtime_usec + 5,
5937                start_realtime_usec + 4,
5938                start_realtime_usec + 3,
5939                start_realtime_usec + 2,
5940                start_realtime_usec + 1,
5941                start_realtime_usec,
5942            ]
5943        );
5944
5945        let forward = collect_netdata_pages(dir.path(), Direction::Forward, 2);
5946        assert_eq!(
5947            forward.messages,
5948            vec![
5949                "row-1", "row-0", "row-3", "row-2", "row-5", "row-4", "row-6"
5950            ]
5951        );
5952        assert_unique_messages(&forward.messages);
5953        assert_eq!(
5954            forward.timestamps,
5955            vec![
5956                start_realtime_usec + 1,
5957                start_realtime_usec,
5958                start_realtime_usec + 3,
5959                start_realtime_usec + 2,
5960                start_realtime_usec + 5,
5961                start_realtime_usec + 4,
5962                start_realtime_usec + 6,
5963            ]
5964        );
5965    }
5966
5967    #[test]
5968    fn netdata_function_tail_polls_return_only_rows_after_anchor_then_304() {
5969        let dir = TempDir::new().expect("tempdir");
5970        let start_realtime_usec = 1_700_000_000_000_000;
5971        write_stepped_netdata_test_journal(
5972            dir.path(),
5973            "tail-contract.journal",
5974            5,
5975            start_realtime_usec,
5976            1,
5977        );
5978        let anchor = start_realtime_usec + 2;
5979
5980        for requested_direction in [Direction::Backward, Direction::Forward] {
5981            let response = run_netdata_contract_request(
5982                dir.path(),
5983                json!({
5984                    "after": 1_700_000_000,
5985                    "before": 1_700_000_010,
5986                    "last": 5,
5987                    "direction": direction_name(requested_direction),
5988                    "data_only": true,
5989                    "tail": true,
5990                    "if_modified_since": anchor,
5991                    "anchor": anchor,
5992                }),
5993            );
5994            assert_eq!(response["status"], 200);
5995            assert_eq!(response["_request"]["direction"], "backward");
5996            assert_eq!(
5997                response_column_strings(&response, "MESSAGE"),
5998                vec!["row-4", "row-3"]
5999            );
6000            assert_eq!(
6001                response_column_u64s(&response, "timestamp"),
6002                vec![start_realtime_usec + 4, start_realtime_usec + 3]
6003            );
6004        }
6005
6006        let no_change = run_netdata_contract_request(
6007            dir.path(),
6008            json!({
6009                "after": 1_700_000_000,
6010                "before": 1_700_000_010,
6011                "last": 5,
6012                "direction": "backward",
6013                "data_only": true,
6014                "tail": true,
6015                "if_modified_since": start_realtime_usec + 4,
6016                "anchor": start_realtime_usec + 4,
6017            }),
6018        );
6019        assert_eq!(no_change["status"], 304);
6020        assert_eq!(
6021            no_change["errorMessage"],
6022            "No new data since the previous call."
6023        );
6024    }
6025
6026    #[test]
6027    fn netdata_function_tail_delta_reports_exact_incremental_facets_and_histogram() {
6028        let dir = TempDir::new().expect("tempdir");
6029        let start_realtime_usec = 1_700_000_000_000_000;
6030        write_stepped_netdata_test_journal(
6031            dir.path(),
6032            "delta-contract.journal",
6033            5,
6034            start_realtime_usec,
6035            1,
6036        );
6037        let anchor = start_realtime_usec + 1;
6038
6039        let response = run_netdata_contract_request(
6040            dir.path(),
6041            json!({
6042                "after": 1_700_000_000,
6043                "before": 1_700_000_010,
6044                "last": 2,
6045                "direction": "backward",
6046                "data_only": true,
6047                "delta": true,
6048                "tail": true,
6049                "if_modified_since": anchor,
6050                "anchor": anchor,
6051                "facets": ["SERVICE"],
6052                "histogram": "SERVICE",
6053            }),
6054        );
6055
6056        assert_eq!(response["status"], 200);
6057        assert_eq!(
6058            response_column_strings(&response, "MESSAGE"),
6059            vec!["row-4", "row-3"]
6060        );
6061        assert_eq!(
6062            response_facet_count(&response, "facets_delta", "SERVICE", "even"),
6063            2
6064        );
6065        assert_eq!(
6066            response_facet_count(&response, "facets_delta", "SERVICE", "odd"),
6067            1
6068        );
6069        assert_eq!(
6070            response_histogram_total(&response, "histogram_delta", "even"),
6071            2
6072        );
6073        assert_eq!(
6074            response_histogram_total(&response, "histogram_delta", "odd"),
6075            1
6076        );
6077        let items = response["items_delta"].as_object().expect("items_delta");
6078        assert_eq!(items["matched"], 3);
6079        assert_eq!(items["returned"], 2);
6080        assert_eq!(items["after"], 2);
6081    }
6082
6083    #[test]
6084    fn realtime_adjuster_preserves_forward_state_across_file_boundaries() {
6085        let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Forward);
6086
6087        assert_eq!(adjuster.adjust(10), 10);
6088        assert_eq!(adjuster.adjust(10), 11);
6089        assert_eq!(adjuster.adjust(10), 12);
6090    }
6091
6092    #[test]
6093    fn realtime_adjuster_preserves_backward_state_across_file_boundaries() {
6094        let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Backward);
6095
6096        assert_eq!(adjuster.adjust(10), 10);
6097        assert_eq!(adjuster.adjust(10), 9);
6098        assert_eq!(adjuster.adjust(10), 8);
6099    }
6100
6101    #[test]
6102    fn systemd_profile_keeps_user_group_ids_raw_by_default() {
6103        let context = DisplayContext::default();
6104        let profile = SystemdJournalProfile;
6105        assert_eq!(
6106            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6107            json!("0")
6108        );
6109        assert_eq!(
6110            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6111            json!("0")
6112        );
6113    }
6114
6115    #[cfg(unix)]
6116    #[test]
6117    fn plugin_compatible_profile_resolves_user_group_ids_explicitly() {
6118        let context = DisplayContext::default();
6119        let profile = SystemdJournalPluginProfile;
6120        assert_eq!(
6121            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6122            json!("root")
6123        );
6124        assert_eq!(
6125            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6126            json!("root")
6127        );
6128    }
6129
6130    #[cfg(unix)]
6131    #[test]
6132    fn plugin_compatible_profile_caches_user_group_resolution() {
6133        let context = DisplayContext::default();
6134        let profile = SystemdJournalPluginProfile;
6135
6136        assert_eq!(
6137            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6138            json!("root")
6139        );
6140        assert_eq!(
6141            profile.field_display_value(&context, DisplayScope::Data, "_UID", b"0"),
6142            json!("root")
6143        );
6144        assert_eq!(
6145            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6146            json!("root")
6147        );
6148        assert_eq!(
6149            profile.field_display_value(&context, DisplayScope::Data, "_GID", b"0"),
6150            json!("root")
6151        );
6152
6153        assert_eq!(context.uid_display_cache.borrow().len(), 1);
6154        assert_eq!(context.gid_display_cache.borrow().len(), 1);
6155    }
6156
6157    #[test]
6158    fn file_overlap_uses_netdata_max_realtime_slack() {
6159        let file_first_seconds = 200_000_000u64;
6160        let file_last_seconds = 200_000_100u64;
6161        let slack_seconds = NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC / 1_000_000;
6162        let header = crate::FileHeader {
6163            signature: *b"LPKSHHRH",
6164            compatible_flags: 0,
6165            incompatible_flags: 0,
6166            state: 0,
6167            header_size: 0,
6168            n_entries: 0,
6169            head_entry_realtime: file_first_seconds * 1_000_000,
6170            tail_entry_realtime: file_last_seconds * 1_000_000,
6171            head_entry_seqnum: 0,
6172            tail_entry_seqnum: 0,
6173            tail_entry_boot_id: [0; 16],
6174            seqnum_id: [0; 16],
6175        };
6176        let config = NetdataFunctionConfig::systemd_journal();
6177
6178        let inside_slack = NetdataRequest::parse(
6179            &json!({
6180                "after": file_last_seconds + slack_seconds - 1,
6181                "before": file_last_seconds + slack_seconds + 500
6182            }),
6183            &config,
6184        )
6185        .expect("parse request");
6186        assert!(file_may_overlap_request(header, &inside_slack));
6187
6188        let outside_slack = NetdataRequest::parse(
6189            &json!({
6190                "after": file_last_seconds + slack_seconds + 1,
6191                "before": file_last_seconds + slack_seconds + 500
6192            }),
6193            &config,
6194        )
6195        .expect("parse request");
6196        assert!(!file_may_overlap_request(header, &outside_slack));
6197    }
6198
6199    #[test]
6200    fn journal_file_order_matches_plugin_comparator_shape() {
6201        let older = JournalFileOrderInfo {
6202            msg_first_realtime_usec: 100,
6203            msg_last_realtime_usec: 200,
6204            file_last_modified_usec: 200,
6205            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6206        };
6207        let newer = JournalFileOrderInfo {
6208            msg_first_realtime_usec: 100,
6209            msg_last_realtime_usec: 300,
6210            file_last_modified_usec: 100,
6211            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6212        };
6213        assert_eq!(
6214            compare_journal_file_order(&newer, &older, Direction::Backward),
6215            Ordering::Less
6216        );
6217        assert_eq!(
6218            compare_journal_file_order(&newer, &older, Direction::Forward),
6219            Ordering::Greater
6220        );
6221
6222        let newer_mtime = JournalFileOrderInfo {
6223            msg_first_realtime_usec: 100,
6224            msg_last_realtime_usec: 200,
6225            file_last_modified_usec: 300,
6226            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6227        };
6228        assert_eq!(
6229            compare_journal_file_order(&newer_mtime, &older, Direction::Backward),
6230            Ordering::Less
6231        );
6232
6233        let newer_first = JournalFileOrderInfo {
6234            msg_first_realtime_usec: 150,
6235            msg_last_realtime_usec: 200,
6236            file_last_modified_usec: 200,
6237            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6238        };
6239        assert_eq!(
6240            compare_journal_file_order(&newer_first, &older, Direction::Backward),
6241            Ordering::Less
6242        );
6243    }
6244
6245    #[test]
6246    fn boot_first_realtime_keeps_earliest_timestamp_like_plugin() {
6247        let mut boot_first = BTreeMap::new();
6248        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 300);
6249        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 100);
6250        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 200);
6251
6252        assert_eq!(boot_first.get(b"boot-a".as_slice()), Some(&100));
6253    }
6254
6255    #[test]
6256    fn merge_histogram_rejects_inconsistent_bucket_shape() {
6257        let mut target = Some(ExplorerHistogram {
6258            field: b"PRIORITY".to_vec(),
6259            buckets: vec![ExplorerHistogramBucket {
6260                start_realtime_usec: 1_000_000,
6261                end_realtime_usec: 2_000_000,
6262                values: HashMap::new(),
6263            }],
6264        });
6265        let source = ExplorerHistogram {
6266            field: b"PRIORITY".to_vec(),
6267            buckets: vec![ExplorerHistogramBucket {
6268                start_realtime_usec: 1_000_000,
6269                end_realtime_usec: 1_500_000,
6270                values: HashMap::new(),
6271            }],
6272        };
6273
6274        let err = merge_histogram(&mut target, source).expect_err("shape mismatch");
6275        assert!(matches!(
6276            err,
6277            SdkError::Unsupported("inconsistent Netdata histogram bucket shape")
6278        ));
6279    }
6280
6281    #[test]
6282    fn collect_journal_files_recurses_nested_directories() {
6283        let dir = TempDir::new().expect("tempdir");
6284        let nested = dir.path().join("machine").join("nested");
6285        write_named_netdata_test_journal(&nested, "system.journal", 1, 1_700_000_000_000_000);
6286
6287        let collection = collect_journal_files(dir.path()).expect("collect files");
6288
6289        assert_eq!(collection.files.len(), 1);
6290        assert_eq!(collection.skipped, 0);
6291        assert!(collection.errors.is_empty());
6292        assert_eq!(
6293            collection.files[0]
6294                .file_name()
6295                .and_then(|name| name.to_str()),
6296            Some("system.journal")
6297        );
6298    }
6299
6300    #[cfg(unix)]
6301    #[test]
6302    fn collect_journal_files_deduplicates_symlinked_directories() {
6303        let dir = TempDir::new().expect("tempdir");
6304        let real = dir.path().join("real");
6305        let link = dir.path().join("link");
6306        write_named_netdata_test_journal(&real, "system.journal", 1, 1_700_000_000_000_000);
6307        std::os::unix::fs::symlink(&real, &link).expect("symlink");
6308
6309        let collection = collect_journal_files(dir.path()).expect("collect files");
6310
6311        assert_eq!(collection.files.len(), 1);
6312        assert_eq!(collection.skipped, 0);
6313        assert!(collection.errors.is_empty());
6314    }
6315
6316    #[cfg(unix)]
6317    #[test]
6318    fn collect_journal_files_deduplicates_symlinked_files() {
6319        let dir = TempDir::new().expect("tempdir");
6320        write_named_netdata_test_journal(dir.path(), "system.journal", 1, 1_700_000_000_000_000);
6321        std::os::unix::fs::symlink(
6322            dir.path().join("system.journal"),
6323            dir.path().join("system-copy.journal"),
6324        )
6325        .expect("symlink");
6326
6327        let collection = collect_journal_files(dir.path()).expect("collect files");
6328
6329        assert_eq!(collection.files.len(), 1);
6330        assert_eq!(collection.skipped, 0);
6331        assert!(collection.errors.is_empty());
6332    }
6333
6334    #[cfg(unix)]
6335    #[test]
6336    fn collect_journal_files_reports_unreadable_subdirectories() {
6337        use std::os::unix::fs::PermissionsExt;
6338
6339        if unsafe { libc::geteuid() } == 0 {
6340            return;
6341        }
6342
6343        let dir = TempDir::new().expect("tempdir");
6344        std::fs::write(dir.path().join("visible.journal"), b"").expect("journal");
6345        let locked = dir.path().join("locked");
6346        std::fs::create_dir(&locked).expect("locked dir");
6347        std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o000))
6348            .expect("lock dir");
6349
6350        let collection = collect_journal_files(dir.path()).expect("collect files");
6351
6352        std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o700))
6353            .expect("unlock dir");
6354        assert_eq!(collection.files.len(), 1);
6355        assert_eq!(collection.skipped, 1);
6356        assert_eq!(collection.errors.len(), 1);
6357        assert!(collection.errors[0].contains("locked"));
6358    }
6359
6360    #[test]
6361    fn source_summary_fills_missing_caller_metadata_from_header() {
6362        let dir = TempDir::new().expect("tempdir");
6363        write_named_netdata_test_journal(dir.path(), "system.journal", 2, 1_700_000_000_000_000);
6364        let path = dir.path().join("system.journal");
6365        let mut state = TestNetdataState::default();
6366        state.metadata.insert(
6367            path.clone(),
6368            NetdataJournalFileMetadata {
6369                msg_first_realtime_usec: Some(1_699_999_999_000_000),
6370                ..NetdataJournalFileMetadata::default()
6371            },
6372        );
6373        let options = NetdataFunctionRunOptions {
6374            state: Some(&mut state),
6375            ..NetdataFunctionRunOptions::default()
6376        };
6377
6378        let summary = JournalSourceSummary::from_paths(&[path], ReaderOptions::default(), &options);
6379
6380        assert_eq!(summary.first_realtime_usec, Some(1_699_999_999_000_000));
6381        assert_eq!(summary.last_realtime_usec, Some(1_700_000_000_000_001));
6382    }
6383
6384    #[test]
6385    fn source_summary_coverage_is_off_below_one_second() {
6386        let first = 1_700_000_000_000_000;
6387        let mut summary = JournalSourceSummary {
6388            files: 1,
6389            total_size: 0,
6390            first_realtime_usec: Some(first),
6391            last_realtime_usec: Some(first + 999_999),
6392        };
6393
6394        assert!(summary.info().contains("covering off"));
6395
6396        summary.last_realtime_usec = Some(first + 1_000_000);
6397        assert!(summary.info().contains("covering 1s"));
6398    }
6399
6400    #[test]
6401    fn netdata_function_uses_default_source_selector_metadata() {
6402        let dir = TempDir::new().expect("tempdir");
6403        let response = NetdataJournalFunction::systemd_journal_plugin_compatible()
6404            .run_directory_request_json(dir.path(), &json!({"info": true}))
6405            .expect("run info request");
6406        let source = response["required_params"]
6407            .as_array()
6408            .expect("required params")
6409            .iter()
6410            .find(|param| param["id"] == "__logs_sources")
6411            .expect("source selector param");
6412
6413        assert_eq!(source["id"], "__logs_sources");
6414        assert_eq!(source["name"], DEFAULT_SOURCE_SELECTOR_NAME);
6415        assert_eq!(source["help"], DEFAULT_SOURCE_SELECTOR_HELP);
6416    }
6417
6418    #[test]
6419    fn netdata_function_uses_custom_source_selector_metadata() {
6420        let dir = TempDir::new().expect("tempdir");
6421        let mut config = NetdataFunctionConfig::systemd_journal();
6422        config.source_selector_name = "Trap Jobs".to_string();
6423        config.source_selector_help = "Select the trap job to query".to_string();
6424        let function = NetdataJournalFunction::new(config, SystemdJournalPluginProfile);
6425
6426        let response = function
6427            .run_directory_request_json(dir.path(), &json!({"info": true}))
6428            .expect("run info request");
6429        let source = response["required_params"]
6430            .as_array()
6431            .expect("required params")
6432            .iter()
6433            .find(|param| param["id"] == "__logs_sources")
6434            .expect("source selector param");
6435
6436        assert_eq!(source["id"], "__logs_sources");
6437        assert_eq!(source["name"], "Trap Jobs");
6438        assert_eq!(source["help"], "Select the trap job to query");
6439    }
6440
6441    #[test]
6442    fn netdata_function_empty_source_selector_metadata_falls_back_to_defaults() {
6443        let dir = TempDir::new().expect("tempdir");
6444        let mut config = NetdataFunctionConfig::systemd_journal();
6445        config.source_selector_name.clear();
6446        config.source_selector_help.clear();
6447        let function = NetdataJournalFunction::new(config, SystemdJournalPluginProfile);
6448
6449        let response = function
6450            .run_directory_request_json(dir.path(), &json!({"info": true}))
6451            .expect("run info request");
6452        let source = response["required_params"]
6453            .as_array()
6454            .expect("required params")
6455            .iter()
6456            .find(|param| param["id"] == "__logs_sources")
6457            .expect("source selector param");
6458
6459        assert_eq!(source["id"], "__logs_sources");
6460        assert_eq!(source["name"], DEFAULT_SOURCE_SELECTOR_NAME);
6461        assert_eq!(source["help"], DEFAULT_SOURCE_SELECTOR_HELP);
6462    }
6463
6464    #[test]
6465    fn source_selection_echoes_and_filters_known_groups() {
6466        let config = NetdataFunctionConfig::systemd_journal();
6467        let request = NetdataRequest::parse(
6468            &json!({
6469                "selections": {
6470                    "__logs_sources": ["all-local-system-logs"]
6471                }
6472            }),
6473            &config,
6474        )
6475        .expect("parse source-filtered request");
6476
6477        assert_eq!(request.source_type, SOURCE_TYPE_LOCAL_SYSTEM);
6478        assert_eq!(
6479            request.echo.get("source_type").and_then(Value::as_u64),
6480            Some(SOURCE_TYPE_LOCAL_SYSTEM)
6481        );
6482        assert!(
6483            request
6484                .echo
6485                .pointer("/selections/__logs_sources/0")
6486                .is_some_and(Value::is_null)
6487        );
6488        assert!(request.matches_source(Path::new("/var/log/journal/machine/system.journal"), None));
6489        assert!(!request.matches_source(
6490            Path::new("/var/log/journal/machine/user-1000.journal"),
6491            None
6492        ));
6493    }
6494
6495    #[test]
6496    fn source_selection_uses_caller_metadata_before_filename_fallback() {
6497        let dir = TempDir::new().expect("tempdir");
6498        write_named_netdata_test_journal(dir.path(), "user-1000.journal", 1, 1_700_000_000_000_000);
6499        let path = dir.path().join("user-1000.journal");
6500        let config = NetdataFunctionConfig::systemd_journal();
6501        let request = NetdataRequest::parse(
6502            &json!({
6503                "after": 1_700_000_000,
6504                "before": 1_700_000_001,
6505                "selections": {
6506                    "__logs_sources": ["all-local-system-logs"]
6507                }
6508            }),
6509            &config,
6510        )
6511        .expect("parse source-filtered request");
6512        assert!(!request.matches_source(&path, None));
6513
6514        let mut state = TestNetdataState::default();
6515        state.metadata.insert(
6516            path.clone(),
6517            NetdataJournalFileMetadata {
6518                source_type: Some(NETDATA_SOURCE_TYPE_LOCAL_SYSTEM),
6519                source_name: Some("system-registry".to_string()),
6520                ..NetdataJournalFileMetadata::default()
6521            },
6522        );
6523        let options = NetdataFunctionRunOptions {
6524            state: Some(&mut state),
6525            ..NetdataFunctionRunOptions::default()
6526        };
6527        let selected =
6528            select_journal_files_for_request(vec![path], &request, config.reader_options, &options);
6529
6530        assert_eq!(selected.files.len(), 1);
6531    }
6532
6533    #[test]
6534    fn netdata_function_state_receives_learned_source_realtime_delta() {
6535        let dir = TempDir::new().expect("tempdir");
6536        let commit_realtime_usec = 1_700_000_030_000_000;
6537        let source_realtime_usec = commit_realtime_usec - 30_000_000;
6538        write_source_realtime_delta_journal(
6539            dir.path(),
6540            "system.journal",
6541            commit_realtime_usec,
6542            source_realtime_usec,
6543        );
6544        let request = json!({
6545            "after": 1_700_000_029,
6546            "before": 1_700_000_031,
6547            "facets": ["SERVICE"],
6548            "histogram": "SERVICE",
6549            "last": 1,
6550            "sampling": 0
6551        });
6552        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
6553        let mut state = TestNetdataState::default();
6554        let options = NetdataFunctionRunOptions {
6555            state: Some(&mut state),
6556            ..NetdataFunctionRunOptions::from_timeout_seconds(0)
6557        };
6558
6559        let response = function
6560            .run_directory_request_json_with_options(dir.path(), &request, options)
6561            .expect("run function");
6562
6563        assert_eq!(response["status"], 200);
6564        assert_eq!(state.updates.len(), 1);
6565        assert_eq!(state.updates[0].1, 30_000_000);
6566    }
6567
6568    #[test]
6569    fn source_classification_matches_plugin_filename_shape() {
6570        assert_eq!(
6571            journal_file_source_type(Path::new("/var/log/journal/machine/system.journal")),
6572            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM
6573        );
6574        assert_eq!(
6575            journal_file_source_type(Path::new("/var/log/journal/machine/user-1000.journal")),
6576            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER
6577        );
6578        assert_eq!(
6579            journal_file_source_type(Path::new("/var/log/journal/machine/other.journal")),
6580            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
6581        );
6582        assert_eq!(
6583            journal_file_source_type(Path::new(
6584                "/var/log/journal/machine.namespace/system.journal"
6585            )),
6586            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE
6587        );
6588        assert_eq!(
6589            journal_file_source_type(Path::new(
6590                "/var/log/journal/remote/remote-host-a@machine.journal"
6591            )),
6592            SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL
6593        );
6594    }
6595
6596    #[test]
6597    fn exact_source_names_follow_plugin_prefixes() {
6598        assert_eq!(
6599            journal_file_exact_source_name(Path::new(
6600                "/var/log/journal/machine.namespace/system.journal"
6601            ))
6602            .as_deref(),
6603            Some("namespace-namespace")
6604        );
6605        assert_eq!(
6606            journal_file_exact_source_name(Path::new(
6607                "/var/log/journal/remote/remote-host-a@machine.journal"
6608            ))
6609            .as_deref(),
6610            Some("remote-host-a")
6611        );
6612        assert_eq!(
6613            journal_file_exact_source_name(Path::new(
6614                "/var/log/journal/remote/remote-host-b.journal~.zst"
6615            ))
6616            .as_deref(),
6617            Some("remote-host-b")
6618        );
6619    }
6620
6621    #[test]
6622    fn disposed_journal_extension_matches_plugin_scan_contract() {
6623        assert!(is_journal_file_name(Path::new("active.journal")));
6624        assert!(is_journal_file_name(Path::new("archived.journal~")));
6625        assert!(is_journal_file_name(Path::new("active.journal.zst")));
6626        assert!(is_journal_file_name(Path::new("archived.journal~.zst")));
6627    }
6628
6629    fn test_located_row(realtime_usec: u64) -> LocatedRow {
6630        LocatedRow {
6631            file_path: PathBuf::from("test.journal"),
6632            row: ExplorerRow {
6633                realtime_usec,
6634                cursor: String::new(),
6635                payloads: Vec::new(),
6636            },
6637        }
6638    }
6639
6640    fn write_source_realtime_delta_journal(
6641        directory: &std::path::Path,
6642        name: &str,
6643        commit_realtime_usec: u64,
6644        source_realtime_usec: u64,
6645    ) {
6646        std::fs::create_dir_all(directory).expect("create journal dir");
6647        let path = directory.join(name);
6648        let repo_file = RepoFile::from_path(&path).expect("repo file");
6649        let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
6650        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
6651        let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
6652        let source = format!("_SOURCE_REALTIME_TIMESTAMP={source_realtime_usec}");
6653        let payloads: [&[u8]; 4] = [
6654            b"MESSAGE=source lag test".as_slice(),
6655            b"SERVICE=delta".as_slice(),
6656            b"PRIORITY=6".as_slice(),
6657            source.as_bytes(),
6658        ];
6659        writer
6660            .add_entry(
6661                &mut file,
6662                &payloads,
6663                commit_realtime_usec,
6664                commit_realtime_usec,
6665            )
6666            .expect("write entry");
6667        file.sync().expect("sync journal");
6668    }
6669}