Skip to main content

journal/
netdata.rs

1use crate::explorer::{
2    ExplorerSamplingState, empty_histogram_for_query, histogram_bucket_count_for_query,
3};
4use crate::{
5    Direction, ExplorerAnchor, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
6    ExplorerFtsPattern, ExplorerHistogram, ExplorerProgress, ExplorerQuery, ExplorerResult,
7    ExplorerRow, ExplorerSampling, ExplorerStats, ExplorerStopReason, ExplorerStrategy, FileHeader,
8    FileReader, ReaderOptions, Result, SdkError,
9};
10use chrono::{DateTime, Utc};
11use serde_json::{Map, Value, json};
12use std::cell::RefCell;
13use std::cmp::{Ordering, Reverse};
14use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet, VecDeque};
15#[cfg(unix)]
16use std::ffi::CStr;
17use std::path::{Path, PathBuf};
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19
20const DEFAULT_FUNCTION_NAME: &str = "systemd-journal";
21const DEFAULT_ITEMS_TO_RETURN: usize = 200;
22const DEFAULT_TIME_WINDOW_SECONDS: i64 = 3600;
23const DEFAULT_ITEMS_SAMPLING: u64 = 1_000_000;
24const DATA_ONLY_CHECK_EVERY_ROWS: u64 = 128;
25const API_RELATIVE_TIME_MAX_SECONDS: i64 = 3 * 365 * 86_400;
26const NETDATA_MISSING_AFTER_RELATIVE_SECONDS: i64 = 600;
27const DEFAULT_HISTOGRAM_BUCKETS: usize = 150;
28const EFFECTIVELY_DISABLED_TIMEOUT_SECONDS: u64 = 100 * 365 * 24 * 60 * 60;
29const NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC: u64 = 5_000_000;
30const NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC: u64 = 2 * 60 * 1_000_000;
31const NETDATA_EMPTY_STRING_FACET_HASH_ID: &str = "CzGfAU2z3TC";
32const NETDATA_UNAVAILABLE_FIELD_LABEL: &str = "[unavailable field]";
33const NETDATA_FACET_MAX_VALUE_LENGTH: usize = 8192;
34const NETDATA_MAX_DIRECTORY_SCAN_DEPTH: usize = 64;
35const NETDATA_MAX_DIRECTORY_SCAN_COUNT: usize = 8192;
36const SOURCE_TYPE_ALL: u64 = 1 << 0;
37const SOURCE_TYPE_LOCAL_ALL: u64 = 1 << 1;
38const SOURCE_TYPE_REMOTE_ALL: u64 = 1 << 2;
39const SOURCE_TYPE_LOCAL_SYSTEM: u64 = 1 << 3;
40const SOURCE_TYPE_LOCAL_USER: u64 = 1 << 4;
41const SOURCE_TYPE_LOCAL_NAMESPACE: u64 = 1 << 5;
42const SOURCE_TYPE_LOCAL_OTHER: u64 = 1 << 6;
43
44pub const NETDATA_SOURCE_TYPE_ALL: u64 = SOURCE_TYPE_ALL;
45pub const NETDATA_SOURCE_TYPE_LOCAL_ALL: u64 = SOURCE_TYPE_LOCAL_ALL;
46pub const NETDATA_SOURCE_TYPE_REMOTE_ALL: u64 = SOURCE_TYPE_REMOTE_ALL;
47pub const NETDATA_SOURCE_TYPE_LOCAL_SYSTEM: u64 = SOURCE_TYPE_LOCAL_SYSTEM;
48pub const NETDATA_SOURCE_TYPE_LOCAL_USER: u64 = SOURCE_TYPE_LOCAL_USER;
49pub const NETDATA_SOURCE_TYPE_LOCAL_NAMESPACE: u64 = SOURCE_TYPE_LOCAL_NAMESPACE;
50pub const NETDATA_SOURCE_TYPE_LOCAL_OTHER: u64 = SOURCE_TYPE_LOCAL_OTHER;
51
52const NETDATA_ACCEPTED_PARAMS: &[&str] = &[
53    "info",
54    "__logs_sources",
55    "after",
56    "before",
57    "anchor",
58    "direction",
59    "last",
60    "query",
61    "facets",
62    "histogram",
63    "if_modified_since",
64    "data_only",
65    "delta",
66    "tail",
67    "sampling",
68    "slice",
69];
70
71const SYSTEMD_DEFAULT_VIEW_KEYS: &[&str] = &[
72    "_HOSTNAME",
73    "ND_JOURNAL_PROCESS",
74    "MESSAGE",
75    "PRIORITY",
76    "SYSLOG_FACILITY",
77    "ERRNO",
78    "ND_JOURNAL_FILE",
79    "SYSLOG_IDENTIFIER",
80    "UNIT",
81    "USER_UNIT",
82    "MESSAGE_ID",
83    "_BOOT_ID",
84    "_SYSTEMD_OWNER_UID",
85    "_UID",
86    "OBJECT_SYSTEMD_OWNER_UID",
87    "OBJECT_UID",
88    "_GID",
89    "OBJECT_GID",
90    "_CAP_EFFECTIVE",
91    "_AUDIT_LOGINUID",
92    "OBJECT_AUDIT_LOGINUID",
93    "_SOURCE_REALTIME_TIMESTAMP",
94];
95
96const SYSTEMD_DEFAULT_FACETS: &[&str] = &[
97    "_HOSTNAME",
98    "PRIORITY",
99    "SYSLOG_FACILITY",
100    "ERRNO",
101    "SYSLOG_IDENTIFIER",
102    "UNIT",
103    "USER_UNIT",
104    "MESSAGE_ID",
105    "_BOOT_ID",
106    "_SYSTEMD_OWNER_UID",
107    "_UID",
108    "OBJECT_SYSTEMD_OWNER_UID",
109    "OBJECT_UID",
110    "_GID",
111    "OBJECT_GID",
112    "_AUDIT_LOGINUID",
113    "OBJECT_AUDIT_LOGINUID",
114    "CODE_FILE",
115    "_SYSTEMD_UNIT",
116    "_SYSTEMD_USER_SLICE",
117    "CODE_FUNC",
118    "_TRANSPORT",
119    "_COMM",
120    "_RUNTIME_SCOPE",
121    "_MACHINE_ID",
122    "_SYSTEMD_SLICE",
123    "UNIT_RESULT",
124    "_SYSTEMD_CGROUP",
125    "_EXE",
126    "_SYSTEMD_USER_UNIT",
127    "_SYSTEMD_SESSION",
128    "COREDUMP_CGROUP",
129    "COREDUMP_USER_UNIT",
130    "COREDUMP_UNIT",
131    "COREDUMP_SIGNAL_NAME",
132    "COREDUMP_COMM",
133    "_UDEV_DEVNODE",
134    "_KERNEL_SUBSYSTEM",
135    "OBJECT_EXE",
136    "OBJECT_SYSTEMD_CGROUP",
137    "OBJECT_COMM",
138    "OBJECT_SYSTEMD_UNIT",
139    "OBJECT_SYSTEMD_USER_UNIT",
140    "_SELINUX_CONTEXT",
141    "_NAMESPACE",
142    "OBJECT_SYSTEMD_SESSION",
143    "CONTAINER_ID",
144    "CONTAINER_NAME",
145    "CONTAINER_TAG",
146    "IMAGE_NAME",
147    "ND_NIDL_NODE",
148    "ND_NIDL_CONTEXT",
149    "ND_LOG_SOURCE",
150    "ND_ALERT_NAME",
151    "ND_ALERT_CLASS",
152    "ND_ALERT_COMPONENT",
153    "ND_ALERT_TYPE",
154    "ND_ALERT_STATUS",
155];
156
157#[derive(Debug, Clone)]
158pub struct NetdataFunctionConfig {
159    pub function_name: String,
160    pub default_facets: Vec<String>,
161    pub default_view_keys: Vec<String>,
162    pub default_histogram: Option<String>,
163    pub reader_options: ReaderOptions,
164    pub explorer_strategy: ExplorerStrategy,
165}
166
167impl NetdataFunctionConfig {
168    pub fn systemd_journal() -> Self {
169        Self {
170            function_name: DEFAULT_FUNCTION_NAME.to_string(),
171            default_facets: SYSTEMD_DEFAULT_FACETS
172                .iter()
173                .map(|field| (*field).to_string())
174                .collect(),
175            default_view_keys: SYSTEMD_DEFAULT_VIEW_KEYS
176                .iter()
177                .map(|field| (*field).to_string())
178                .collect(),
179            default_histogram: Some("PRIORITY".to_string()),
180            reader_options: ReaderOptions::snapshot(),
181            explorer_strategy: ExplorerStrategy::Traversal,
182        }
183    }
184}
185
186impl Default for NetdataFunctionConfig {
187    fn default() -> Self {
188        Self::systemd_journal()
189    }
190}
191
192#[derive(Debug, Default)]
193pub struct DisplayContext {
194    boot_first_realtime: BTreeMap<Vec<u8>, u64>,
195    uid_display_cache: RefCell<BTreeMap<String, String>>,
196    gid_display_cache: RefCell<BTreeMap<String, String>>,
197}
198
199#[derive(Debug, Clone, Copy)]
200pub enum DisplayScope {
201    Data,
202    Facet,
203    Histogram,
204}
205
206pub trait NetdataFunctionProfile {
207    fn field_display_value(
208        &self,
209        _context: &DisplayContext,
210        _scope: DisplayScope,
211        _field: &str,
212        value: &[u8],
213    ) -> Value {
214        Value::String(String::from_utf8_lossy(value).into_owned())
215    }
216
217    fn facet_option_name(&self, context: &DisplayContext, field: &str, raw_value: &[u8]) -> String {
218        match self.field_display_value(context, DisplayScope::Facet, field, raw_value) {
219            Value::String(value) => value,
220            other => other.to_string(),
221        }
222    }
223
224    fn row_options(&self, fields: &BTreeMap<String, Vec<Vec<u8>>>) -> Value {
225        if let Some(priority) = first_value(fields, "PRIORITY") {
226            return json!({ "severity": priority_to_row_severity(priority) });
227        }
228        json!({ "severity": "normal" })
229    }
230}
231
232#[derive(Debug, Clone, Copy, Default)]
233pub struct SystemdJournalProfile;
234
235#[derive(Debug, Clone, Copy, Default)]
236pub struct SystemdJournalPluginProfile;
237
238impl NetdataFunctionProfile for SystemdJournalProfile {
239    fn field_display_value(
240        &self,
241        context: &DisplayContext,
242        scope: DisplayScope,
243        field: &str,
244        value: &[u8],
245    ) -> Value {
246        systemd_field_display_value(context, scope, field, value, false)
247    }
248}
249
250impl NetdataFunctionProfile for SystemdJournalPluginProfile {
251    fn field_display_value(
252        &self,
253        context: &DisplayContext,
254        scope: DisplayScope,
255        field: &str,
256        value: &[u8],
257    ) -> Value {
258        systemd_field_display_value(context, scope, field, value, true)
259    }
260}
261
262#[derive(Debug, Clone)]
263pub struct NetdataJournalFunction<P = SystemdJournalProfile> {
264    config: NetdataFunctionConfig,
265    profile: P,
266}
267
268#[derive(Debug, Clone)]
269pub struct NetdataFunctionProgress {
270    pub current_file: usize,
271    pub total_files: usize,
272    pub matched_files: u64,
273    pub skipped_files: u64,
274    pub stats: ExplorerStats,
275    pub elapsed: Duration,
276}
277
278#[derive(Debug, Clone, Default, PartialEq, Eq)]
279pub struct NetdataJournalFileMetadata {
280    pub source_type: Option<u64>,
281    pub source_name: Option<String>,
282    pub file_last_modified_usec: Option<u64>,
283    pub msg_first_realtime_usec: Option<u64>,
284    pub msg_last_realtime_usec: Option<u64>,
285    pub journal_vs_realtime_delta_usec: Option<u64>,
286}
287
288pub trait NetdataFunctionState {
289    fn file_metadata(&self, _path: &Path) -> Option<NetdataJournalFileMetadata> {
290        None
291    }
292
293    fn update_file_journal_vs_realtime_delta_usec(&mut self, _path: &Path, _delta_usec: u64) {}
294}
295
296pub struct NetdataFunctionRunOptions<'a> {
297    pub timeout: Option<Duration>,
298    pub progress_callback: Option<&'a mut dyn FnMut(NetdataFunctionProgress)>,
299    pub cancellation_callback: Option<&'a dyn Fn() -> bool>,
300    pub state: Option<&'a mut dyn NetdataFunctionState>,
301    pub progress_interval: Duration,
302}
303
304impl NetdataFunctionRunOptions<'_> {
305    pub fn from_timeout_seconds(seconds: u64) -> Self {
306        let seconds = if seconds == 0 {
307            EFFECTIVELY_DISABLED_TIMEOUT_SECONDS
308        } else {
309            seconds
310        };
311        Self {
312            timeout: Some(Duration::from_secs(seconds)),
313            progress_callback: None,
314            cancellation_callback: None,
315            state: None,
316            progress_interval: Duration::from_millis(250),
317        }
318    }
319}
320
321impl Default for NetdataFunctionRunOptions<'_> {
322    fn default() -> Self {
323        Self {
324            timeout: Some(Duration::from_secs(EFFECTIVELY_DISABLED_TIMEOUT_SECONDS)),
325            progress_callback: None,
326            cancellation_callback: None,
327            state: None,
328            progress_interval: Duration::from_millis(250),
329        }
330    }
331}
332
333impl NetdataJournalFunction<SystemdJournalProfile> {
334    pub fn systemd_journal() -> Self {
335        Self {
336            config: NetdataFunctionConfig::systemd_journal(),
337            profile: SystemdJournalProfile,
338        }
339    }
340}
341
342impl NetdataJournalFunction<SystemdJournalPluginProfile> {
343    pub fn systemd_journal_plugin_compatible() -> Self {
344        Self {
345            config: NetdataFunctionConfig::systemd_journal(),
346            profile: SystemdJournalPluginProfile,
347        }
348    }
349}
350
351impl<P> NetdataJournalFunction<P>
352where
353    P: NetdataFunctionProfile,
354{
355    pub fn new(config: NetdataFunctionConfig, profile: P) -> Self {
356        Self { config, profile }
357    }
358
359    pub fn run_directory_request_json(&self, directory: &Path, request: &Value) -> Result<Value> {
360        self.run_directory_request_json_with_options(
361            directory,
362            request,
363            NetdataFunctionRunOptions::default(),
364        )
365    }
366
367    pub fn run_directory_request_json_with_options(
368        &self,
369        directory: &Path,
370        request: &Value,
371        mut options: NetdataFunctionRunOptions<'_>,
372    ) -> Result<Value> {
373        let request = NetdataRequest::parse(request, &self.config)?;
374        let collection = collect_journal_files(directory)?;
375        let paths = collection.files;
376        if request.info {
377            return Ok(self.info_response(request.echo, &paths, &options));
378        }
379        let annotation_paths = paths.clone();
380
381        let selected =
382            select_journal_files_for_request(paths, &request, self.config.reader_options, &options);
383        if let Some(response) = not_modified_before_scan_response(&request, &selected) {
384            return Ok(response);
385        }
386        let selected_files = selected.files;
387        let deadline = options.timeout.map(|timeout| Instant::now() + timeout);
388        let mut combined = self.explore_files(&selected_files, &request, deadline, &mut options)?;
389        self.finalize_combined_result(
390            &request,
391            &selected_files,
392            deadline,
393            &mut options,
394            &mut combined,
395            collection.skipped,
396            collection.errors,
397        )?;
398        Ok(self.query_response(request, &annotation_paths, combined))
399    }
400
401    fn finalize_combined_result(
402        &self,
403        request: &NetdataRequest,
404        selected_files: &[SelectedJournalFile],
405        deadline: Option<Instant>,
406        options: &mut NetdataFunctionRunOptions<'_>,
407        combined: &mut CombinedResult,
408        skipped_files: u64,
409        file_errors: Vec<String>,
410    ) -> Result<()> {
411        combined.skipped_files = combined.skipped_files.saturating_add(skipped_files);
412        combined.file_errors.extend(file_errors);
413        if combined.cancelled {
414            return Ok(());
415        }
416        if !request.data_only {
417            combined.add_zero_count_facet_values_from_files(
418                &request.facets,
419                self.config.reader_options,
420            );
421            combined.add_zero_count_selected_filter_values(request);
422        }
423        if should_collect_unfiltered_facet_vocabulary(request, combined) {
424            let vocabulary = self.explore_files(
425                selected_files,
426                &request.unfiltered_vocabulary(),
427                deadline,
428                options,
429            )?;
430            combined.add_zero_count_facet_values(&vocabulary.facets);
431        }
432        Ok(())
433    }
434
435    pub fn run_directory_request_bytes(&self, directory: &Path, request: &[u8]) -> Result<Value> {
436        self.run_directory_request_bytes_with_options(
437            directory,
438            request,
439            NetdataFunctionRunOptions::default(),
440        )
441    }
442
443    pub fn run_directory_request_bytes_with_options(
444        &self,
445        directory: &Path,
446        request: &[u8],
447        options: NetdataFunctionRunOptions<'_>,
448    ) -> Result<Value> {
449        let request: Value = serde_json::from_slice(request).map_err(|err| {
450            SdkError::InvalidPath(format!("invalid Netdata function JSON: {err}"))
451        })?;
452        self.run_directory_request_json_with_options(directory, &request, options)
453    }
454
455    fn explore_files(
456        &self,
457        files: &[SelectedJournalFile],
458        request: &NetdataRequest,
459        deadline: Option<Instant>,
460        options: &mut NetdataFunctionRunOptions<'_>,
461    ) -> Result<CombinedResult> {
462        let query = request.to_explorer_query(
463            files.len() as u64,
464            None,
465            NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
466        );
467        let mut combined = CombinedResult::default();
468        let page_window = RefCell::new(NetdataPageWindow::for_request(request));
469        combined.sampling_enabled = query.sampling.is_some();
470        let mut sampling_state =
471            ExplorerSamplingState::for_query(&query, histogram_bucket_count_for_query(&query));
472        let realtime_adjuster = RefCell::new(NetdataRealtimeAdjuster::new(request.direction));
473        let started = Instant::now();
474        let total_files = files.len();
475        for (file_index, file) in files.iter().enumerate() {
476            let path = &file.path;
477            if should_stop_before_file(&mut combined, deadline, options) {
478                break;
479            }
480            let Some(mut reader) = self.open_file_for_explore(
481                path,
482                &mut combined,
483                options,
484                progress_context(file_index, total_files, started),
485            )?
486            else {
487                continue;
488            };
489            combined.matched_files = combined.matched_files.saturating_add(1);
490            combined.matched_paths.push(path.clone());
491            let query = request.file_query(files.len(), reader.header(), &file.order);
492            collect_column_fields_for_file(&mut reader, request, path, &mut combined);
493            let explored = self.explore_single_file(
494                &mut reader,
495                request,
496                &query,
497                deadline,
498                options,
499                &combined,
500                &page_window,
501                sampling_state.as_mut(),
502                &realtime_adjuster,
503                progress_context(file_index, total_files, started),
504                file.order.journal_vs_realtime_delta_usec,
505            );
506            let Some((result, stop_reason)) = record_explore_result(explored, path, &mut combined)
507            else {
508                continue;
509            };
510            if finish_explored_file(
511                options,
512                request,
513                file,
514                &query,
515                result,
516                stop_reason,
517                &mut combined,
518                files,
519                file_index,
520                progress_context(file_index, total_files, started),
521            )? {
522                break;
523            }
524        }
525        combined.expand_row_payloads(self.config.reader_options);
526        combined.page_counters = Some(page_window.into_inner().counters());
527        Ok(combined)
528    }
529
530    fn open_file_for_explore(
531        &self,
532        path: &Path,
533        combined: &mut CombinedResult,
534        options: &mut NetdataFunctionRunOptions<'_>,
535        progress: ProgressContext,
536    ) -> Result<Option<FileReader>> {
537        match FileReader::open_with_options(path, self.config.reader_options) {
538            Ok(reader) => Ok(Some(reader)),
539            Err(err) => {
540                combined.skipped_files = combined.skipped_files.saturating_add(1);
541                combined
542                    .file_errors
543                    .push(format!("{}: {err}", path.display()));
544                emit_progress_for_combined(options, combined, progress);
545                Ok(None)
546            }
547        }
548    }
549
550    #[allow(clippy::too_many_arguments)]
551    fn explore_single_file(
552        &self,
553        reader: &mut FileReader,
554        request: &NetdataRequest,
555        query: &ExplorerQuery,
556        deadline: Option<Instant>,
557        options: &mut NetdataFunctionRunOptions<'_>,
558        combined: &CombinedResult,
559        page_window: &RefCell<NetdataPageWindow>,
560        sampling_state: Option<&mut ExplorerSamplingState>,
561        realtime_adjuster: &RefCell<NetdataRealtimeAdjuster>,
562        progress: ProgressContext,
563        realtime_delta_usec: u64,
564    ) -> Result<(ExplorerResult, Option<ExplorerStopReason>)> {
565        let cancellation_callback = options.cancellation_callback;
566        let progress_interval = options.progress_interval;
567        let mut explorer_progress = |explorer_progress: ExplorerProgress| {
568            emit_explorer_progress(options, combined, explorer_progress, progress);
569        };
570        let mut control = ExplorerControl::new();
571        control.set_deadline(deadline);
572        control.set_cancellation_callback(cancellation_callback);
573        control.set_progress_interval(progress_interval);
574        control.set_progress_callback(Some(&mut explorer_progress));
575        control.set_sampling_state(sampling_state);
576        let mut candidate_row =
577            |realtime_usec| page_window.borrow().candidate_to_keep(realtime_usec);
578        control.set_candidate_row_callback(Some(&mut candidate_row));
579        let mut adjust_realtime =
580            |realtime_usec| realtime_adjuster.borrow_mut().adjust(realtime_usec);
581        control.set_realtime_adjust_callback(Some(&mut adjust_realtime));
582        let mut matched_row = |realtime_usec, rows_matched| {
583            delta_scan_can_stop(
584                request,
585                page_window,
586                realtime_usec,
587                rows_matched,
588                realtime_delta_usec,
589            )
590        };
591        control.set_matched_row_callback(Some(&mut matched_row));
592        let result = reader.explore_with_strategy_cursor_rows_controlled(
593            query,
594            self.config.explorer_strategy,
595            &mut control,
596        )?;
597        Ok((result, control.stop_reason()))
598    }
599
600    fn info_response(
601        &self,
602        echo: Value,
603        paths: &[PathBuf],
604        options: &NetdataFunctionRunOptions<'_>,
605    ) -> Value {
606        json!({
607            "_request": echo,
608            "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
609            "v": 3,
610            "accepted_params": self.accepted_params_from_fields(&[]),
611            "required_params": self.required_source_params(paths, options),
612            "show_ids": true,
613            "has_history": true,
614            "pagination": {
615                "enabled": true,
616                "key": "anchor",
617                "column": "timestamp",
618                "units": "timestamp_usec",
619            },
620            "status": 200,
621            "type": "table",
622            "help": "Netdata-compatible journal log function backed by the systemd journal SDK"
623        })
624    }
625
626    fn query_response(
627        &self,
628        request: NetdataRequest,
629        annotation_paths: &[PathBuf],
630        combined: CombinedResult,
631    ) -> Value {
632        if combined.cancelled {
633            return netdata_function_error(499, "Request cancelled.");
634        }
635        let artifacts = self.query_response_artifacts(&request, annotation_paths, &combined);
636        let mut response = base_query_response(&request, &combined, &artifacts);
637        let Some(object) = response.as_object_mut() else {
638            return netdata_function_error(500, "Internal Netdata function response error.");
639        };
640        self.add_query_response_metadata(object, &request, &combined, artifacts);
641        response
642    }
643
644    fn query_response_artifacts(
645        &self,
646        request: &NetdataRequest,
647        annotation_paths: &[PathBuf],
648        combined: &CombinedResult,
649    ) -> QueryResponseArtifacts {
650        let reportable_facet_fields = combined.reportable_facet_fields_bytes(&request.facets);
651        let reportable_facet_field_names = string_fields(&reportable_facet_fields);
652        let columns = self.build_columns(
653            &request,
654            &reportable_facet_fields,
655            &combined.rows,
656            &combined.facets,
657            &combined.column_fields,
658        );
659        let boot_ids = response_boot_ids(
660            &columns.order,
661            &combined.rows,
662            &combined.facets,
663            combined.histogram.as_ref(),
664        );
665        let context = DisplayContext {
666            boot_first_realtime: collect_boot_first_realtime(
667                annotation_paths,
668                self.config.reader_options,
669                &boot_ids,
670            ),
671            ..DisplayContext::default()
672        };
673        let data =
674            self.build_data_rows(&context, &columns.order, &combined.rows, request.direction);
675        let facets = self.build_facets(&context, &reportable_facet_fields, &combined.facets);
676        let histogram = self.query_histogram_artifact(request, combined, &context);
677        let message = query_message(combined.timed_out, &combined.stats);
678        let items = response_items(request, combined, data.len() as u64);
679        QueryResponseArtifacts {
680            reportable_facet_field_names,
681            columns,
682            data,
683            facets,
684            histogram,
685            message,
686            items,
687        }
688    }
689
690    fn add_query_response_metadata(
691        &self,
692        object: &mut Map<String, Value>,
693        request: &NetdataRequest,
694        combined: &CombinedResult,
695        artifacts: QueryResponseArtifacts,
696    ) {
697        if !request.data_only {
698            self.add_full_query_response_metadata(object, request, combined, &artifacts);
699        } else if request.histogram.is_some() {
700            object.insert(
701                "available_histograms".to_string(),
702                self.available_histograms(request, combined),
703            );
704        }
705        add_last_modified_if_needed(object, request, combined);
706        add_sampling_if_needed(object, combined);
707        add_analysis_outputs_if_needed(object, request, artifacts);
708    }
709
710    fn query_histogram_artifact(
711        &self,
712        request: &NetdataRequest,
713        combined: &CombinedResult,
714        context: &DisplayContext,
715    ) -> Option<Value> {
716        if let Some(histogram) = combined.histogram.as_ref() {
717            return Some(self.build_histogram(
718                context,
719                histogram,
720                combined.facets.get(&histogram.field),
721            ));
722        }
723        let histogram_field = request.histogram.as_ref()?;
724        if request.data_only && !request.delta {
725            return None;
726        }
727        let query = request.to_explorer_query(
728            combined.matched_files,
729            None,
730            NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
731        );
732        let histogram = empty_histogram_for_query(histogram_field.as_bytes(), &query);
733        Some(self.build_histogram(
734            context,
735            &histogram,
736            combined.facets.get(histogram.field.as_slice()),
737        ))
738    }
739
740    fn add_full_query_response_metadata(
741        &self,
742        object: &mut Map<String, Value>,
743        request: &NetdataRequest,
744        combined: &CombinedResult,
745        artifacts: &QueryResponseArtifacts,
746    ) {
747        object.insert("message".to_string(), artifacts.message.clone());
748        object.insert("update_every".to_string(), Value::from(1));
749        object.insert("help".to_string(), Value::Null);
750        object.insert(
751            "accepted_params".to_string(),
752            self.accepted_params_from_fields(&artifacts.reportable_facet_field_names),
753        );
754        object.insert("default_sort_column".to_string(), Value::from("timestamp"));
755        object.insert("default_charts".to_string(), Value::Array(Vec::new()));
756        object.insert(
757            "available_histograms".to_string(),
758            self.available_histograms(request, combined),
759        );
760    }
761
762    fn build_columns(
763        &self,
764        request: &NetdataRequest,
765        reportable_facet_fields: &[Vec<u8>],
766        rows: &[LocatedRow],
767        facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
768        column_fields: &BTreeSet<String>,
769    ) -> Columns {
770        let mut order = vec!["timestamp".to_string(), "rowOptions".to_string()];
771        push_unique_many(&mut order, &self.config.default_view_keys);
772        push_unique_many(&mut order, &string_fields(reportable_facet_fields));
773        if let Some(histogram) = &request.histogram {
774            push_unique(&mut order, histogram);
775        }
776        for field in column_fields {
777            push_unique(&mut order, field);
778        }
779
780        for (field, values) in facets {
781            if !facet_group_is_reportable(values) {
782                continue;
783            }
784            push_unique(&mut order, &String::from_utf8_lossy(field));
785        }
786        for row in rows {
787            let fields = row_fields(row);
788            for field in fields.keys() {
789                push_unique(&mut order, field);
790            }
791        }
792
793        let mut map = Map::new();
794        for (index, key) in order.iter().enumerate() {
795            map.insert(key.clone(), column_metadata(key, index));
796        }
797        Columns { order, map }
798    }
799
800    fn build_data_rows(
801        &self,
802        context: &DisplayContext,
803        column_order: &[String],
804        rows: &[LocatedRow],
805        direction: Direction,
806    ) -> Vec<Value> {
807        let row_iter: Box<dyn Iterator<Item = &LocatedRow> + '_> = match direction {
808            Direction::Forward => Box::new(rows.iter().rev()),
809            Direction::Backward => Box::new(rows.iter()),
810        };
811        row_iter
812            .map(|located| {
813                let fields = row_fields(located);
814                let mut row = Vec::with_capacity(column_order.len());
815                for column in column_order {
816                    let value = match column.as_str() {
817                        "timestamp" => Value::from(located.row.realtime_usec),
818                        "rowOptions" => self.profile.row_options(&fields),
819                        field => first_value(&fields, field)
820                            .map(|value| {
821                                self.profile.field_display_value(
822                                    context,
823                                    DisplayScope::Data,
824                                    field,
825                                    value,
826                                )
827                            })
828                            .unwrap_or(Value::Null),
829                    };
830                    row.push(value);
831                }
832                Value::Array(row)
833            })
834            .collect()
835    }
836
837    fn build_facets(
838        &self,
839        context: &DisplayContext,
840        requested: &[Vec<u8>],
841        facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
842    ) -> Value {
843        let mut out = Vec::new();
844        for (order, field) in requested.iter().enumerate() {
845            let values = facets.get(field);
846            let field_name = String::from_utf8_lossy(field).into_owned();
847            let mut options: Vec<_> = values
848                .into_iter()
849                .flat_map(|values| values.iter())
850                .filter(|(value, count)| {
851                    (!value.is_empty() && value.as_slice() != b"-")
852                        || (**count == 0 && value.is_empty())
853                })
854                .map(|(value, count)| {
855                    if *count == 0 && value.is_empty() {
856                        return json!({
857                            "id": NETDATA_EMPTY_STRING_FACET_HASH_ID,
858                            "name": NETDATA_UNAVAILABLE_FIELD_LABEL,
859                            "count": count,
860                        });
861                    }
862                    json!({
863                        "id": String::from_utf8_lossy(value).into_owned(),
864                        "name": self.profile.facet_option_name(context, &field_name, value),
865                        "count": count,
866                    })
867                })
868                .collect();
869            sort_facet_options(&field_name, &mut options);
870            for (idx, option) in options.iter_mut().enumerate() {
871                if let Some(object) = option.as_object_mut() {
872                    object.insert("order".to_string(), Value::from((idx + 1) as u64));
873                }
874            }
875            out.push(json!({
876                "id": field_name,
877                "name": String::from_utf8_lossy(field).into_owned(),
878                "order": order + 1,
879                "options": options,
880            }));
881        }
882        Value::Array(out)
883    }
884
885    fn build_histogram(
886        &self,
887        context: &DisplayContext,
888        histogram: &ExplorerHistogram,
889        known_values: Option<&BTreeMap<Vec<u8>, u64>>,
890    ) -> Value {
891        let field = String::from_utf8_lossy(&histogram.field).into_owned();
892        let mut dimension_ids = BTreeSet::new();
893        let mut buckets = Vec::with_capacity(histogram.buckets.len());
894        for bucket in &histogram.buckets {
895            let mut values = BTreeMap::new();
896            for (value, count) in &bucket.values {
897                add_netdata_facet_count(&mut values, value, *count);
898            }
899            for value in values.keys() {
900                dimension_ids.insert(value.clone());
901            }
902            buckets.push((bucket.start_realtime_usec, values));
903        }
904        let actual_dimension_ids = dimension_ids.clone();
905        if let Some(known_values) = known_values {
906            for value in known_values.keys() {
907                if value.is_empty() || value.as_slice() == b"-" {
908                    continue;
909                }
910                dimension_ids.insert(value.clone());
911            }
912        }
913        let dimension_ids: Vec<Vec<u8>> = dimension_ids.into_iter().collect();
914        let metadata = self.histogram_chart_metadata(
915            context,
916            &field,
917            &buckets,
918            &actual_dimension_ids,
919            &dimension_ids,
920        );
921        let data: Vec<Value> = buckets
922            .iter()
923            .map(|(start_realtime_usec, values)| {
924                let mut point = Vec::with_capacity(dimension_ids.len() + 1);
925                point.push(Value::from(start_realtime_usec / 1000));
926                for value in &dimension_ids {
927                    let count = values
928                        .get(value)
929                        .copied()
930                        .map(Value::from)
931                        .unwrap_or_else(|| {
932                            if actual_dimension_ids.contains(value) {
933                                Value::from(0)
934                            } else {
935                                Value::Null
936                            }
937                        });
938                    point.push(Value::Array(vec![count, Value::from(0), Value::from(0)]));
939                }
940                Value::Array(point)
941            })
942            .collect();
943
944        json!({
945            "id": field,
946            "name": field,
947            "chart": {
948                "summary": metadata.summary,
949                "totals": metadata.totals,
950                "result": {
951                    "labels": metadata.result_labels,
952                    "point": { "value": 0, "arp": 1, "pa": 2 },
953                    "data": data,
954                },
955                "db": {
956                    "tiers": 1,
957                    "update_every": histogram_update_every_seconds(histogram),
958                    "units": "events",
959                    "dimensions": {
960                        "ids": metadata.ids.clone(),
961                        "names": metadata.names.clone(),
962                        "units": metadata.units.clone(),
963                        "sts": metadata.stats.clone(),
964                    },
965                    "per_tier": [{
966                        "tier": 0,
967                        "queries": 1,
968                        "points": metadata.points,
969                        "update_every": histogram_update_every_seconds(histogram),
970                    }],
971                },
972                "view": {
973                    "title": format!("Events Distribution by {}", String::from_utf8_lossy(&histogram.field)),
974                    "update_every": histogram_update_every_seconds(histogram),
975                    "after": histogram_after_seconds(histogram),
976                    "before": histogram_before_seconds(histogram),
977                    "units": "events",
978                    "chart_type": "stackedBar",
979                    "dimensions": {
980                        "grouped_by": ["dimension"],
981                        "ids": metadata.ids,
982                        "names": metadata.names,
983                        "colors": metadata.colors,
984                        "units": metadata.units,
985                        "sts": metadata.stats,
986                    },
987                    "min": metadata.min,
988                    "max": metadata.max,
989                },
990                "agents": [{
991                    "mg": "default",
992                    "nm": "facets.histogram",
993                    "now": now_unix_seconds(),
994                    "ai": 0,
995                }],
996            }
997        })
998    }
999
1000    fn histogram_chart_metadata(
1001        &self,
1002        context: &DisplayContext,
1003        field: &str,
1004        buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1005        actual_dimensions: &BTreeSet<Vec<u8>>,
1006        dimensions: &[Vec<u8>],
1007    ) -> HistogramChartMetadata {
1008        let mut ids = Vec::with_capacity(dimensions.len());
1009        let mut names = Vec::with_capacity(dimensions.len());
1010        let mut colors = Vec::with_capacity(dimensions.len());
1011        let mut units = Vec::with_capacity(dimensions.len());
1012        let mut min_values = Vec::with_capacity(dimensions.len());
1013        let mut max_values = Vec::with_capacity(dimensions.len());
1014        let mut avg_values = Vec::with_capacity(dimensions.len());
1015        let mut arp_values = Vec::with_capacity(dimensions.len());
1016        let mut con_values = Vec::with_capacity(dimensions.len());
1017        let mut summary_dimensions = Vec::with_capacity(dimensions.len());
1018        let mut result_labels = vec![Value::String("time".to_string())];
1019
1020        let total: u64 = dimensions
1021            .iter()
1022            .map(|dimension| {
1023                buckets
1024                    .iter()
1025                    .map(|(_, values)| values.get(dimension).copied().unwrap_or(0))
1026                    .sum::<u64>()
1027            })
1028            .sum();
1029
1030        let mut min = 0;
1031        let mut max = 0;
1032        let mut points = 0;
1033        for (priority, dimension) in dimensions.iter().enumerate() {
1034            let id = String::from_utf8_lossy(dimension).into_owned();
1035            let display = match self.profile.field_display_value(
1036                context,
1037                DisplayScope::Histogram,
1038                field,
1039                dimension,
1040            ) {
1041                Value::String(value) => value,
1042                other => other.to_string(),
1043            };
1044            let (dimension_min, dimension_max, dimension_sum, actual) =
1045                histogram_dimension_stats(buckets, actual_dimensions, dimension);
1046            let dimension_average = if actual && !buckets.is_empty() {
1047                dimension_sum as f64 / buckets.len() as f64
1048            } else {
1049                0.0
1050            };
1051            if actual {
1052                if points == 0 || dimension_min < min {
1053                    min = dimension_min;
1054                }
1055                if dimension_max > max {
1056                    max = dimension_max;
1057                }
1058                points += buckets.len() as u64;
1059            }
1060            let contribution = if total > 0 {
1061                dimension_sum as f64 * 100.0 / total as f64
1062            } else {
1063                0.0
1064            };
1065
1066            ids.push(Value::String(id.clone()));
1067            names.push(Value::String(display.clone()));
1068            colors.push(Value::Null);
1069            units.push(Value::String("events".to_string()));
1070            result_labels.push(Value::String(display.clone()));
1071            min_values.push(Value::from(dimension_min));
1072            max_values.push(Value::from(dimension_max));
1073            avg_values.push(Value::from(dimension_average));
1074            arp_values.push(Value::from(0));
1075            con_values.push(Value::from(contribution));
1076            summary_dimensions.push(json!({
1077                "id": id,
1078                "nm": display,
1079                "ds": { "sl": bool_to_u64(actual), "qr": bool_to_u64(actual) },
1080                "sts": {
1081                    "min": dimension_min,
1082                    "max": dimension_max,
1083                    "avg": dimension_average,
1084                    "con": contribution,
1085                },
1086                "pri": priority as u64,
1087            }));
1088        }
1089
1090        let stats = json!({
1091            "min": min_values,
1092            "max": max_values,
1093            "avg": avg_values,
1094            "arp": arp_values,
1095            "con": con_values,
1096        });
1097        let summary_stats = json!({
1098            "min": min,
1099            "max": max,
1100            "avg": histogram_average(total, points),
1101            "con": 100.0,
1102        });
1103        let dimensions_len = dimensions.len() as u64;
1104
1105        HistogramChartMetadata {
1106            ids,
1107            names,
1108            colors,
1109            units,
1110            stats,
1111            summary: json!({
1112                "nodes": [histogram_summary_node(dimensions_len, points, &summary_stats)],
1113                "contexts": [histogram_summary_context(dimensions_len, points, &summary_stats)],
1114                "instances": [histogram_summary_instance(dimensions_len, points, &summary_stats)],
1115                "dimensions": summary_dimensions,
1116                "labels": [],
1117                "alerts": [],
1118            }),
1119            totals: histogram_totals(dimensions_len),
1120            result_labels,
1121            min,
1122            max,
1123            points,
1124        }
1125    }
1126
1127    fn accepted_params_from_fields(&self, fields: &[String]) -> Value {
1128        NETDATA_ACCEPTED_PARAMS
1129            .iter()
1130            .copied()
1131            .chain(fields.iter().map(String::as_str))
1132            .map(|field| Value::String(field.to_string()))
1133            .collect()
1134    }
1135
1136    fn required_source_params(
1137        &self,
1138        paths: &[PathBuf],
1139        options: &NetdataFunctionRunOptions<'_>,
1140    ) -> Value {
1141        let mut all = JournalSourceSummary::default();
1142        let mut local = JournalSourceSummary::default();
1143        let mut local_namespaces = JournalSourceSummary::default();
1144        let mut local_system = JournalSourceSummary::default();
1145        let mut local_user = JournalSourceSummary::default();
1146        let mut remote = JournalSourceSummary::default();
1147        let mut other = JournalSourceSummary::default();
1148        let mut exact = BTreeMap::<String, JournalSourceSummary>::new();
1149
1150        for path in paths {
1151            let metadata = file_metadata(options, path);
1152            let source_type = metadata
1153                .as_ref()
1154                .and_then(|metadata| metadata.source_type)
1155                .unwrap_or_else(|| journal_file_source_type(path));
1156            all.add_path(path, self.config.reader_options, metadata.as_ref());
1157            if source_type & SOURCE_TYPE_LOCAL_ALL != 0 {
1158                local.add_path(path, self.config.reader_options, metadata.as_ref());
1159            }
1160            if source_type & SOURCE_TYPE_LOCAL_NAMESPACE != 0 {
1161                local_namespaces.add_path(path, self.config.reader_options, metadata.as_ref());
1162            }
1163            if source_type & SOURCE_TYPE_LOCAL_SYSTEM != 0 {
1164                local_system.add_path(path, self.config.reader_options, metadata.as_ref());
1165            }
1166            if source_type & SOURCE_TYPE_LOCAL_USER != 0 {
1167                local_user.add_path(path, self.config.reader_options, metadata.as_ref());
1168            }
1169            if source_type & SOURCE_TYPE_REMOTE_ALL != 0 {
1170                remote.add_path(path, self.config.reader_options, metadata.as_ref());
1171            }
1172            if source_type & SOURCE_TYPE_LOCAL_OTHER != 0 {
1173                other.add_path(path, self.config.reader_options, metadata.as_ref());
1174            }
1175            let source_name = metadata
1176                .as_ref()
1177                .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1178                .or_else(|| journal_file_exact_source_name(path));
1179            if let Some(source_name) = source_name {
1180                exact.entry(source_name).or_default().add_path(
1181                    path,
1182                    self.config.reader_options,
1183                    metadata.as_ref(),
1184                );
1185            }
1186        }
1187
1188        let mut source_options = Vec::new();
1189        push_source_option(&mut source_options, "all", &all);
1190        push_source_option(&mut source_options, "all-local-logs", &local);
1191        push_source_option(
1192            &mut source_options,
1193            "all-local-namespaces",
1194            &local_namespaces,
1195        );
1196        push_source_option(&mut source_options, "all-local-system-logs", &local_system);
1197        push_source_option(&mut source_options, "all-local-user-logs", &local_user);
1198        push_source_option(&mut source_options, "all-remote-systems", &remote);
1199        push_source_option(&mut source_options, "all-uncategorized", &other);
1200        for (name, summary) in exact {
1201            push_source_option(&mut source_options, &name, &summary);
1202        }
1203
1204        json!([{
1205            "id": "__logs_sources",
1206            "name": "Journal Sources",
1207            "help": "Select the logs source to query",
1208            "type": "multiselect",
1209            "options": source_options,
1210        }])
1211    }
1212
1213    fn available_histograms(&self, request: &NetdataRequest, combined: &CombinedResult) -> Value {
1214        let mut fields = combined.reportable_facet_fields(&request.facets);
1215        if request.data_only {
1216            if let Some(histogram) = &request.histogram {
1217                push_unique(&mut fields, histogram);
1218            }
1219        }
1220        let mut sorted = fields.clone();
1221        sorted.sort_by(|left, right| netdata_reorder_key(left).cmp(&netdata_reorder_key(right)));
1222        let order_by_field: BTreeMap<String, usize> = sorted
1223            .into_iter()
1224            .enumerate()
1225            .map(|(index, field)| (field, index + 1))
1226            .collect();
1227
1228        fields
1229            .into_iter()
1230            .map(|field| {
1231                let order = order_by_field.get(&field).copied().unwrap_or(0);
1232                json!({
1233                    "id": field,
1234                    "name": field,
1235                    "order": order,
1236                })
1237            })
1238            .collect()
1239    }
1240}
1241
1242struct HistogramChartMetadata {
1243    ids: Vec<Value>,
1244    names: Vec<Value>,
1245    colors: Vec<Value>,
1246    units: Vec<Value>,
1247    stats: Value,
1248    summary: Value,
1249    totals: Value,
1250    result_labels: Vec<Value>,
1251    min: u64,
1252    max: u64,
1253    points: u64,
1254}
1255
1256fn histogram_dimension_stats(
1257    buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1258    actual_dimensions: &BTreeSet<Vec<u8>>,
1259    dimension: &[u8],
1260) -> (u64, u64, u64, bool) {
1261    if !actual_dimensions.contains(dimension) {
1262        return (0, 0, 0, false);
1263    }
1264    let mut min = 0;
1265    let mut max = 0;
1266    let mut sum = 0;
1267    for (idx, (_, values)) in buckets.iter().enumerate() {
1268        let count = values.get(dimension).copied().unwrap_or(0);
1269        if idx == 0 || count < min {
1270            min = count;
1271        }
1272        if count > max {
1273            max = count;
1274        }
1275        sum += count;
1276    }
1277    (min, max, sum, true)
1278}
1279
1280fn histogram_average(total: u64, points: u64) -> f64 {
1281    if points == 0 {
1282        0.0
1283    } else {
1284        total as f64 / points as f64
1285    }
1286}
1287
1288fn histogram_summary_node(dimensions: u64, points: u64, stats: &Value) -> Value {
1289    let mut node = json!({
1290        "mg": "default",
1291        "nm": "facets.histogram",
1292        "ni": 0,
1293        "st": { "ai": 0, "code": 200, "msg": "" },
1294    });
1295    add_histogram_summary_shape(&mut node, dimensions, points, stats, true);
1296    node
1297}
1298
1299fn histogram_summary_context(dimensions: u64, points: u64, stats: &Value) -> Value {
1300    let mut context = json!({ "id": "facets.histogram" });
1301    add_histogram_summary_shape(&mut context, dimensions, points, stats, true);
1302    context
1303}
1304
1305fn histogram_summary_instance(dimensions: u64, points: u64, stats: &Value) -> Value {
1306    let mut instance = json!({ "id": "facets.histogram", "ni": 0 });
1307    add_histogram_summary_shape(&mut instance, dimensions, points, stats, false);
1308    instance
1309}
1310
1311fn add_histogram_summary_shape(
1312    object: &mut Value,
1313    dimensions: u64,
1314    points: u64,
1315    stats: &Value,
1316    include_instances: bool,
1317) {
1318    let Some(map) = object.as_object_mut() else {
1319        return;
1320    };
1321    if dimensions > 0 {
1322        if include_instances {
1323            map.insert("is".to_string(), json!({ "sl": 1, "qr": 1 }));
1324        }
1325        map.insert(
1326            "ds".to_string(),
1327            json!({ "sl": dimensions, "qr": dimensions }),
1328        );
1329    }
1330    if points > 0 {
1331        map.insert("sts".to_string(), stats.clone());
1332    }
1333}
1334
1335fn histogram_totals(dimensions: u64) -> Value {
1336    let mut totals = json!({ "nodes": { "sl": 1, "qr": 1 } });
1337    if dimensions > 0 {
1338        if let Some(map) = totals.as_object_mut() {
1339            map.insert("contexts".to_string(), json!({ "sl": 1, "qr": 1 }));
1340            map.insert("instances".to_string(), json!({ "sl": 1, "qr": 1 }));
1341            map.insert(
1342                "dimensions".to_string(),
1343                json!({ "sl": dimensions, "qr": dimensions }),
1344            );
1345        }
1346    }
1347    totals
1348}
1349
1350fn bool_to_u64(value: bool) -> u64 {
1351    if value { 1 } else { 0 }
1352}
1353
1354fn histogram_after_seconds(histogram: &ExplorerHistogram) -> u64 {
1355    histogram
1356        .buckets
1357        .first()
1358        .map(|bucket| bucket.start_realtime_usec / 1_000_000)
1359        .unwrap_or(0)
1360}
1361
1362fn histogram_before_seconds(histogram: &ExplorerHistogram) -> u64 {
1363    histogram
1364        .buckets
1365        .last()
1366        .map(|bucket| bucket.end_realtime_usec / 1_000_000)
1367        .unwrap_or(0)
1368}
1369
1370fn now_unix_seconds() -> u64 {
1371    SystemTime::now()
1372        .duration_since(UNIX_EPOCH)
1373        .unwrap_or_default()
1374        .as_secs()
1375}
1376
1377#[derive(Debug, Clone)]
1378struct NetdataRequest {
1379    info: bool,
1380    echo: Value,
1381    after_realtime_usec: Option<u64>,
1382    before_realtime_usec: Option<u64>,
1383    if_modified_since_usec: u64,
1384    anchor: ExplorerAnchor,
1385    direction: Direction,
1386    limit: usize,
1387    data_only: bool,
1388    delta: bool,
1389    tail: bool,
1390    sampling: u64,
1391    source_type: u64,
1392    exact_sources: Vec<String>,
1393    filters: Vec<ExplorerFilter>,
1394    facets: Vec<Vec<u8>>,
1395    histogram: Option<String>,
1396    fts_terms: Vec<ExplorerFtsPattern>,
1397    fts_patterns: Vec<Vec<u8>>,
1398    fts_negative_patterns: Vec<Vec<u8>>,
1399}
1400
1401impl NetdataRequest {
1402    fn parse(value: &Value, config: &NetdataFunctionConfig) -> Result<Self> {
1403        let object = value.as_object().ok_or_else(|| {
1404            SdkError::InvalidPath("Netdata function request must be a JSON object".to_string())
1405        })?;
1406        let now_seconds = unix_now_seconds();
1407        let info = get_bool(object, "info").unwrap_or(false);
1408        let after = get_i64(object, "after");
1409        let before = get_i64(object, "before");
1410        let (after_realtime_usec, before_realtime_usec) =
1411            normalize_time_window(now_seconds, after, before);
1412        let direction = request_direction(object);
1413        let if_modified_since_usec = get_u64(object, "if_modified_since").unwrap_or_default();
1414        let data_only = get_bool(object, "data_only").unwrap_or(false);
1415        let delta = request_delta(data_only, object);
1416        let tail = request_tail(data_only, if_modified_since_usec, object);
1417        let sampling = get_u64(object, "sampling").unwrap_or(DEFAULT_ITEMS_SAMPLING);
1418        let (anchor, direction) = request_anchor_and_direction(
1419            object,
1420            tail,
1421            direction,
1422            after_realtime_usec,
1423            before_realtime_usec,
1424        );
1425        let requested_limit = request_limit(object);
1426        let limit = requested_limit.max(2);
1427        let requested_facets = parse_string_array(object.get("facets"));
1428        let facets = request_facets(&requested_facets, config);
1429        let requested_histogram = request_histogram(object);
1430        let histogram = request_histogram_or_default(&requested_histogram, config);
1431        let requested_query = request_query(object);
1432        let (fts_terms, fts_patterns, fts_negative_patterns) = requested_query
1433            .as_deref()
1434            .map(parse_fts_query_patterns)
1435            .unwrap_or_default();
1436        let source_selection = parse_source_selection(object.get("selections"));
1437        let filters = parse_filters(object.get("selections"));
1438
1439        let echo_input = RequestEchoInput {
1440            info,
1441            after_realtime_usec,
1442            before_realtime_usec,
1443            if_modified_since_usec,
1444            anchor,
1445            direction,
1446            limit: requested_limit,
1447            data_only,
1448            delta,
1449            tail,
1450            sampling,
1451            source_type: source_selection.source_type,
1452            requested_facets: requested_facets.as_deref(),
1453            selections: object.get("selections"),
1454            histogram: requested_histogram.as_deref(),
1455            query: requested_query.as_deref(),
1456        };
1457        let echo = normalized_request_echo(&echo_input);
1458
1459        Ok(Self {
1460            info,
1461            echo,
1462            after_realtime_usec,
1463            before_realtime_usec,
1464            if_modified_since_usec,
1465            anchor,
1466            direction,
1467            limit,
1468            data_only,
1469            delta,
1470            tail,
1471            sampling,
1472            source_type: source_selection.source_type,
1473            exact_sources: source_selection.exact_sources,
1474            filters,
1475            facets,
1476            histogram,
1477            fts_terms,
1478            fts_patterns,
1479            fts_negative_patterns,
1480        })
1481    }
1482
1483    fn matches_source(&self, path: &Path, metadata: Option<&NetdataJournalFileMetadata>) -> bool {
1484        if self.source_type == SOURCE_TYPE_ALL && self.exact_sources.is_empty() {
1485            return true;
1486        }
1487        if self.source_type & SOURCE_TYPE_ALL != 0 {
1488            return true;
1489        }
1490        let file_source_type = metadata
1491            .and_then(|metadata| metadata.source_type)
1492            .unwrap_or_else(|| journal_file_source_type(path));
1493        if file_source_type & self.source_type != 0 {
1494            return true;
1495        }
1496        if self.exact_sources.is_empty() {
1497            return false;
1498        }
1499        let source_name = metadata
1500            .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1501            .or_else(|| journal_file_exact_source_name(path));
1502        self.exact_sources
1503            .iter()
1504            .any(|source| source_name.as_deref() == Some(source.as_str()))
1505    }
1506
1507    fn to_explorer_query(
1508        &self,
1509        matched_files: u64,
1510        file_header: Option<FileHeader>,
1511        realtime_slack_usec: u64,
1512    ) -> ExplorerQuery {
1513        let analysis_enabled = !self.data_only || self.delta;
1514        let tail_anchor = self.tail && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1515        let backward_page_anchor = self.data_only
1516            && !tail_anchor
1517            && self.direction == Direction::Backward
1518            && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1519        let after_realtime_usec = if tail_anchor {
1520            tail_after_realtime_bound(self.after_realtime_usec, self.anchor)
1521        } else {
1522            self.after_realtime_usec
1523        };
1524        let before_realtime_usec = if backward_page_anchor {
1525            before_realtime_bound_excluding_anchor(self.before_realtime_usec, self.anchor)
1526        } else {
1527            self.before_realtime_usec
1528        };
1529        let anchor = if tail_anchor || backward_page_anchor {
1530            ExplorerAnchor::Auto
1531        } else {
1532            self.anchor
1533        };
1534        let sampling = (analysis_enabled
1535            && self.sampling != 0
1536            && matched_files != 0
1537            && after_realtime_usec.is_some()
1538            && before_realtime_usec.is_some())
1539        .then(|| {
1540            let header = file_header.unwrap_or(FileHeader {
1541                signature: [0; 8],
1542                compatible_flags: 0,
1543                incompatible_flags: 0,
1544                state: 0,
1545                header_size: 0,
1546                n_entries: 0,
1547                head_entry_realtime: 0,
1548                tail_entry_realtime: 0,
1549                head_entry_seqnum: 0,
1550                tail_entry_seqnum: 0,
1551                tail_entry_boot_id: [0; 16],
1552                seqnum_id: [0; 16],
1553            });
1554            let messages_in_file = header
1555                .tail_entry_seqnum
1556                .checked_sub(header.head_entry_seqnum)
1557                .and_then(|span| span.checked_add(1))
1558                .filter(|_| header.head_entry_seqnum != 0 && header.tail_entry_seqnum != 0)
1559                .unwrap_or(header.n_entries);
1560            ExplorerSampling {
1561                budget: self.sampling,
1562                matched_files,
1563                file_head_realtime_usec: header.head_entry_realtime,
1564                file_tail_realtime_usec: header.tail_entry_realtime,
1565                file_head_seqnum: header.head_entry_seqnum,
1566                file_tail_seqnum: header.tail_entry_seqnum,
1567                file_entries: messages_in_file,
1568            }
1569        });
1570        ExplorerQuery {
1571            after_realtime_usec,
1572            before_realtime_usec,
1573            anchor,
1574            direction: self.direction,
1575            limit: self.limit,
1576            filters: self.filters.clone(),
1577            facets: analysis_enabled
1578                .then(|| self.facets.clone())
1579                .unwrap_or_default(),
1580            histogram: analysis_enabled
1581                .then(|| {
1582                    self.histogram
1583                        .as_ref()
1584                        .map(|field| field.as_bytes().to_vec())
1585                })
1586                .flatten(),
1587            histogram_after_realtime_usec: self.after_realtime_usec,
1588            histogram_before_realtime_usec: self.before_realtime_usec,
1589            histogram_target_buckets: DEFAULT_HISTOGRAM_BUCKETS,
1590            fts_terms: self.fts_terms.clone(),
1591            fts_patterns: self.fts_patterns.clone(),
1592            fts_negative_patterns: self.fts_negative_patterns.clone(),
1593            field_mode: ExplorerFieldMode::FirstValue,
1594            exclude_facet_field_filters: self.distinct_filter_fields() > 1,
1595            use_source_realtime: true,
1596            realtime_slack_usec: normalize_journal_vs_realtime_delta_usec(realtime_slack_usec),
1597            stop_when_rows_full: self.data_only && !tail_anchor,
1598            stop_when_rows_full_check_every: DATA_ONLY_CHECK_EVERY_ROWS,
1599            sampling,
1600            debug_collect_column_fields_by_row_traversal: false,
1601        }
1602    }
1603
1604    fn file_query(
1605        &self,
1606        matched_files: usize,
1607        file_header: FileHeader,
1608        order: &JournalFileOrderInfo,
1609    ) -> ExplorerQuery {
1610        let mut query = self.to_explorer_query(
1611            matched_files as u64,
1612            Some(file_header),
1613            order.journal_vs_realtime_delta_usec,
1614        );
1615        if self.data_only && self.delta {
1616            query.stop_when_rows_full = false;
1617        }
1618        query
1619    }
1620
1621    fn unfiltered_vocabulary(&self) -> Self {
1622        let mut request = self.clone();
1623        request.filters.clear();
1624        request.histogram = None;
1625        request.limit = 0;
1626        request.fts_terms.clear();
1627        request.fts_patterns.clear();
1628        request.fts_negative_patterns.clear();
1629        request
1630    }
1631
1632    fn distinct_filter_fields(&self) -> usize {
1633        self.filters
1634            .iter()
1635            .map(|filter| filter.field.as_slice())
1636            .collect::<HashSet<_>>()
1637            .len()
1638    }
1639}
1640
1641#[derive(Debug, Clone)]
1642struct LocatedRow {
1643    file_path: PathBuf,
1644    row: ExplorerRow,
1645}
1646
1647#[derive(Debug)]
1648struct NetdataRealtimeAdjuster {
1649    direction: Direction,
1650    last_realtime_from: u64,
1651    last_realtime_to: u64,
1652}
1653
1654impl NetdataRealtimeAdjuster {
1655    fn new(direction: Direction) -> Self {
1656        Self {
1657            direction,
1658            last_realtime_from: 0,
1659            last_realtime_to: 0,
1660        }
1661    }
1662
1663    fn adjust(&mut self, realtime_usec: u64) -> u64 {
1664        match self.direction {
1665            Direction::Backward => {
1666                if realtime_usec >= self.last_realtime_from
1667                    && realtime_usec <= self.last_realtime_to
1668                {
1669                    self.last_realtime_from = self.last_realtime_from.saturating_sub(1);
1670                    self.last_realtime_from
1671                } else {
1672                    self.last_realtime_from = realtime_usec;
1673                    self.last_realtime_to = realtime_usec;
1674                    realtime_usec
1675                }
1676            }
1677            Direction::Forward => {
1678                if realtime_usec >= self.last_realtime_from
1679                    && realtime_usec <= self.last_realtime_to
1680                {
1681                    self.last_realtime_to = self.last_realtime_to.saturating_add(1);
1682                    self.last_realtime_to
1683                } else {
1684                    self.last_realtime_from = realtime_usec;
1685                    self.last_realtime_to = realtime_usec;
1686                    realtime_usec
1687                }
1688            }
1689        }
1690    }
1691}
1692
1693#[derive(Debug, Default)]
1694struct JournalSourceSummary {
1695    files: u64,
1696    total_size: u64,
1697    first_realtime_usec: Option<u64>,
1698    last_realtime_usec: Option<u64>,
1699}
1700
1701impl JournalSourceSummary {
1702    #[cfg(test)]
1703    fn from_paths(
1704        paths: &[PathBuf],
1705        reader_options: ReaderOptions,
1706        options: &NetdataFunctionRunOptions<'_>,
1707    ) -> Self {
1708        let mut summary = Self::default();
1709        for path in paths {
1710            let metadata = file_metadata(options, path);
1711            summary.add_path(path, reader_options, metadata.as_ref());
1712        }
1713        summary
1714    }
1715
1716    fn add_path(
1717        &mut self,
1718        path: &Path,
1719        reader_options: ReaderOptions,
1720        metadata: Option<&NetdataJournalFileMetadata>,
1721    ) {
1722        if let Ok(metadata) = std::fs::metadata(path) {
1723            self.files = self.files.saturating_add(1);
1724            self.total_size = self.total_size.saturating_add(metadata.len());
1725        }
1726        if let Some(metadata) = metadata {
1727            let metadata_first = metadata.msg_first_realtime_usec;
1728            let metadata_last = metadata.msg_last_realtime_usec;
1729            if let Some(first) = metadata_first {
1730                self.first_realtime_usec = Some(
1731                    self.first_realtime_usec
1732                        .map_or(first, |current| current.min(first)),
1733                );
1734            }
1735            if let Some(last) = metadata_last {
1736                self.last_realtime_usec = Some(
1737                    self.last_realtime_usec
1738                        .map_or(last, |current| current.max(last)),
1739                );
1740            }
1741            if metadata_first.is_some() && metadata_last.is_some() {
1742                return;
1743            }
1744        }
1745        let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
1746            return;
1747        };
1748        let header = reader.header();
1749        if header.head_entry_realtime != 0 {
1750            self.first_realtime_usec = Some(
1751                self.first_realtime_usec
1752                    .map_or(header.head_entry_realtime, |current| {
1753                        current.min(header.head_entry_realtime)
1754                    }),
1755            );
1756        }
1757        if header.tail_entry_realtime != 0 {
1758            self.last_realtime_usec = Some(
1759                self.last_realtime_usec
1760                    .map_or(header.tail_entry_realtime, |current| {
1761                        current.max(header.tail_entry_realtime)
1762                    }),
1763            );
1764        }
1765    }
1766
1767    fn info(&self) -> String {
1768        let coverage = match (self.first_realtime_usec, self.last_realtime_usec) {
1769            (Some(first), Some(last)) if last > first && (last - first) >= 1_000_000 => {
1770                human_duration_seconds((last - first) / 1_000_000)
1771            }
1772            _ => "off".to_string(),
1773        };
1774        let last_entry = self
1775            .last_realtime_usec
1776            .and_then(|usec| DateTime::<Utc>::from_timestamp((usec / 1_000_000) as i64, 0))
1777            .map(|datetime| datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1778            .unwrap_or_else(|| "unknown".to_string());
1779        format!(
1780            "{} files, total size {}, covering {}, last entry at {}",
1781            self.files,
1782            human_binary_size(self.total_size),
1783            coverage,
1784            last_entry
1785        )
1786    }
1787}
1788
1789fn push_source_option(target: &mut Vec<Value>, id: &str, summary: &JournalSourceSummary) {
1790    if summary.files == 0 {
1791        return;
1792    }
1793    target.push(json!({
1794        "id": id,
1795        "name": id,
1796        "info": summary.info(),
1797        "pill": human_binary_size(summary.total_size),
1798    }));
1799}
1800
1801fn expand_located_row_payloads(
1802    located: &mut LocatedRow,
1803    reader_options: ReaderOptions,
1804) -> Result<()> {
1805    let mut reader = FileReader::open_with_options(&located.file_path, reader_options)?;
1806    reader.seek_cursor(&located.row.cursor)?;
1807    if !reader.test_cursor(&located.row.cursor)? {
1808        return Err(SdkError::InvalidCursor(format!(
1809            "selected row cursor is no longer available: {}",
1810            located.row.cursor
1811        )));
1812    }
1813    reader.collect_entry_payloads(&mut located.row.payloads)
1814}
1815
1816#[derive(Debug, Default)]
1817struct CombinedResult {
1818    rows: Vec<LocatedRow>,
1819    facets: BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1820    histogram: Option<ExplorerHistogram>,
1821    column_fields: BTreeSet<String>,
1822    stats: ExplorerStats,
1823    page_counters: Option<NetdataPageCounters>,
1824    matched_files: u64,
1825    matched_paths: Vec<PathBuf>,
1826    skipped_files: u64,
1827    file_errors: Vec<String>,
1828    partial: bool,
1829    timed_out: bool,
1830    cancelled: bool,
1831    sampling_enabled: bool,
1832}
1833
1834#[derive(Clone, Copy, Debug, Default)]
1835struct NetdataPageCounters {
1836    matched: u64,
1837    before: u64,
1838    after: u64,
1839}
1840
1841#[derive(Clone, Copy)]
1842struct ProgressContext {
1843    current_file: usize,
1844    total_files: usize,
1845    started: Instant,
1846}
1847
1848#[derive(Debug)]
1849enum NetdataPageHeap {
1850    Backward(BinaryHeap<Reverse<u64>>),
1851    Forward(BinaryHeap<u64>),
1852}
1853
1854#[derive(Debug)]
1855struct NetdataPageWindow {
1856    direction: Direction,
1857    anchor_start_usec: Option<u64>,
1858    anchor_stop_usec: Option<u64>,
1859    limit: usize,
1860    heap: NetdataPageHeap,
1861    oldest_retained_usec: Option<u64>,
1862    newest_retained_usec: Option<u64>,
1863    matched: u64,
1864    skips_before: u64,
1865    skips_after: u64,
1866    shifts: u64,
1867}
1868
1869impl NetdataPageWindow {
1870    fn for_request(request: &NetdataRequest) -> Self {
1871        let (anchor_start_usec, anchor_stop_usec) = match (request.tail, request.anchor) {
1872            (true, ExplorerAnchor::Realtime(anchor)) => (None, Some(anchor)),
1873            (false, ExplorerAnchor::Realtime(anchor)) => (Some(anchor), None),
1874            _ => (None, None),
1875        };
1876        let heap = match request.direction {
1877            Direction::Backward => NetdataPageHeap::Backward(BinaryHeap::new()),
1878            Direction::Forward => NetdataPageHeap::Forward(BinaryHeap::new()),
1879        };
1880        Self {
1881            direction: request.direction,
1882            anchor_start_usec,
1883            anchor_stop_usec,
1884            limit: request.limit,
1885            heap,
1886            oldest_retained_usec: None,
1887            newest_retained_usec: None,
1888            matched: 0,
1889            skips_before: 0,
1890            skips_after: if request.tail
1891                && request.delta
1892                && matches!(request.anchor, ExplorerAnchor::Realtime(_))
1893            {
1894                1
1895            } else {
1896                0
1897            },
1898            shifts: 0,
1899        }
1900    }
1901
1902    fn candidate_to_keep(&self, realtime_usec: u64) -> bool {
1903        if self.limit == 0 || !self.entry_within_anchor_readonly(realtime_usec) {
1904            return false;
1905        }
1906        if self.retained_len() < self.limit {
1907            return true;
1908        }
1909        self.oldest_retained_usec
1910            .zip(self.newest_retained_usec)
1911            .is_some_and(|(oldest, newest)| realtime_usec >= oldest && realtime_usec <= newest)
1912    }
1913
1914    fn observe(&mut self, realtime_usec: u64) {
1915        if !self.entry_within_anchor(realtime_usec) || self.limit == 0 {
1916            return;
1917        }
1918        self.matched = self.matched.saturating_add(1);
1919        match (&mut self.heap, self.direction) {
1920            (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1921                if heap.len() < self.limit {
1922                    heap.push(Reverse(realtime_usec));
1923                    self.add_retained_bound(realtime_usec);
1924                    return;
1925                }
1926                let Some(Reverse(oldest)) = heap.peek().copied() else {
1927                    heap.push(Reverse(realtime_usec));
1928                    self.refresh_retained_bounds();
1929                    return;
1930                };
1931                if realtime_usec < oldest {
1932                    self.skips_after = self.skips_after.saturating_add(1);
1933                    return;
1934                }
1935                heap.pop();
1936                heap.push(Reverse(realtime_usec));
1937                self.refresh_retained_bounds();
1938                self.shifts = self.shifts.saturating_add(1);
1939            }
1940            (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1941                if heap.len() < self.limit {
1942                    heap.push(realtime_usec);
1943                    self.add_retained_bound(realtime_usec);
1944                    return;
1945                }
1946                let Some(newest) = heap.peek().copied() else {
1947                    heap.push(realtime_usec);
1948                    self.refresh_retained_bounds();
1949                    return;
1950                };
1951                if realtime_usec > newest {
1952                    self.skips_before = self.skips_before.saturating_add(1);
1953                    return;
1954                }
1955                heap.pop();
1956                heap.push(realtime_usec);
1957                self.refresh_retained_bounds();
1958                self.shifts = self.shifts.saturating_add(1);
1959            }
1960            _ => {}
1961        }
1962    }
1963
1964    fn retained_len(&self) -> usize {
1965        match &self.heap {
1966            NetdataPageHeap::Backward(heap) => heap.len(),
1967            NetdataPageHeap::Forward(heap) => heap.len(),
1968        }
1969    }
1970
1971    fn add_retained_bound(&mut self, realtime_usec: u64) {
1972        self.oldest_retained_usec = Some(
1973            self.oldest_retained_usec
1974                .map_or(realtime_usec, |current| current.min(realtime_usec)),
1975        );
1976        self.newest_retained_usec = Some(
1977            self.newest_retained_usec
1978                .map_or(realtime_usec, |current| current.max(realtime_usec)),
1979        );
1980    }
1981
1982    fn refresh_retained_bounds(&mut self) {
1983        let (oldest, newest) = match &self.heap {
1984            NetdataPageHeap::Backward(heap) => heap
1985                .iter()
1986                .map(|Reverse(usec)| *usec)
1987                .fold((None, None), retained_bounds_fold),
1988            NetdataPageHeap::Forward(heap) => heap
1989                .iter()
1990                .copied()
1991                .fold((None, None), retained_bounds_fold),
1992        };
1993        self.oldest_retained_usec = oldest;
1994        self.newest_retained_usec = newest;
1995    }
1996
1997    fn entry_within_anchor(&mut self, realtime_usec: u64) -> bool {
1998        match self.direction {
1999            Direction::Backward => {
2000                if self
2001                    .anchor_start_usec
2002                    .is_some_and(|anchor| realtime_usec >= anchor)
2003                {
2004                    self.skips_before = self.skips_before.saturating_add(1);
2005                    return false;
2006                }
2007                if self
2008                    .anchor_stop_usec
2009                    .is_some_and(|anchor| realtime_usec <= anchor)
2010                {
2011                    self.skips_after = self.skips_after.saturating_add(1);
2012                    return false;
2013                }
2014            }
2015            Direction::Forward => {
2016                if self
2017                    .anchor_start_usec
2018                    .is_some_and(|anchor| realtime_usec <= anchor)
2019                {
2020                    self.skips_after = self.skips_after.saturating_add(1);
2021                    return false;
2022                }
2023                if self
2024                    .anchor_stop_usec
2025                    .is_some_and(|anchor| realtime_usec >= anchor)
2026                {
2027                    self.skips_before = self.skips_before.saturating_add(1);
2028                    return false;
2029                }
2030            }
2031        }
2032        true
2033    }
2034
2035    fn entry_within_anchor_readonly(&self, realtime_usec: u64) -> bool {
2036        match self.direction {
2037            Direction::Backward => {
2038                if self
2039                    .anchor_start_usec
2040                    .is_some_and(|anchor| realtime_usec >= anchor)
2041                {
2042                    return false;
2043                }
2044                if self
2045                    .anchor_stop_usec
2046                    .is_some_and(|anchor| realtime_usec <= anchor)
2047                {
2048                    return false;
2049                }
2050            }
2051            Direction::Forward => {
2052                if self
2053                    .anchor_start_usec
2054                    .is_some_and(|anchor| realtime_usec <= anchor)
2055                {
2056                    return false;
2057                }
2058                if self
2059                    .anchor_stop_usec
2060                    .is_some_and(|anchor| realtime_usec >= anchor)
2061                {
2062                    return false;
2063                }
2064            }
2065        }
2066        true
2067    }
2068
2069    fn counters(&self) -> NetdataPageCounters {
2070        NetdataPageCounters {
2071            matched: self.matched,
2072            before: self.skips_before,
2073            after: self.skips_after.saturating_add(self.shifts),
2074        }
2075    }
2076
2077    fn can_stop_delta_file(&self, realtime_usec: u64, slack_usec: u64) -> bool {
2078        if self.limit == 0 {
2079            return false;
2080        }
2081        match (&self.heap, self.direction) {
2082            (NetdataPageHeap::Backward(heap), Direction::Backward) => {
2083                if heap.len() < self.limit {
2084                    return false;
2085                }
2086                heap.peek().is_some_and(|Reverse(oldest)| {
2087                    realtime_usec < oldest.saturating_sub(slack_usec)
2088                })
2089            }
2090            (NetdataPageHeap::Forward(heap), Direction::Forward) => {
2091                if heap.len() < self.limit {
2092                    return false;
2093                }
2094                heap.peek()
2095                    .is_some_and(|newest| realtime_usec > newest.saturating_add(slack_usec))
2096            }
2097            _ => false,
2098        }
2099    }
2100}
2101
2102fn retained_bounds_fold(
2103    (oldest, newest): (Option<u64>, Option<u64>),
2104    realtime_usec: u64,
2105) -> (Option<u64>, Option<u64>) {
2106    (
2107        Some(oldest.map_or(realtime_usec, |current| current.min(realtime_usec))),
2108        Some(newest.map_or(realtime_usec, |current| current.max(realtime_usec))),
2109    )
2110}
2111
2112impl CombinedResult {
2113    fn merge(
2114        &mut self,
2115        path: &Path,
2116        result: ExplorerResult,
2117        direction: Direction,
2118        limit: usize,
2119    ) -> Result<()> {
2120        let ExplorerResult {
2121            rows,
2122            facets,
2123            histogram,
2124            stats,
2125            column_fields,
2126            ..
2127        } = result;
2128
2129        if let Some(histogram) = histogram {
2130            merge_histogram(&mut self.histogram, histogram)?;
2131        }
2132
2133        self.merge_stats(stats);
2134        for row in rows {
2135            self.rows.push(LocatedRow {
2136                file_path: path.to_path_buf(),
2137                row,
2138            });
2139        }
2140        for field in column_fields {
2141            if let Ok(field) = String::from_utf8(field) {
2142                self.column_fields.insert(field);
2143            }
2144        }
2145        for (field, values) in facets {
2146            let target = self.facets.entry(field).or_default();
2147            for (value, count) in values {
2148                add_netdata_facet_count(target, &value, count);
2149            }
2150        }
2151        self.sort_and_limit(direction, limit);
2152        Ok(())
2153    }
2154
2155    fn add_column_fields<I>(&mut self, fields: I)
2156    where
2157        I: IntoIterator<Item = String>,
2158    {
2159        self.column_fields.extend(fields);
2160    }
2161
2162    fn sort_and_limit(&mut self, direction: Direction, limit: usize) {
2163        match direction {
2164            Direction::Forward => self.rows.sort_by_key(|row| row.row.realtime_usec),
2165            Direction::Backward => self
2166                .rows
2167                .sort_by(|left, right| right.row.realtime_usec.cmp(&left.row.realtime_usec)),
2168        }
2169        make_row_timestamps_unique(&mut self.rows, direction);
2170        if self.rows.len() > limit {
2171            self.rows.truncate(limit);
2172        }
2173        self.stats.rows_returned = self.rows.len() as u64;
2174    }
2175
2176    fn expand_row_payloads(&mut self, reader_options: ReaderOptions) {
2177        if self.rows.is_empty() {
2178            self.stats.rows_returned = 0;
2179            return;
2180        }
2181
2182        let mut rows = Vec::with_capacity(self.rows.len());
2183        for mut located in self.rows.drain(..) {
2184            if !located.row.payloads.is_empty() {
2185                rows.push(located);
2186                continue;
2187            }
2188            match expand_located_row_payloads(&mut located, reader_options) {
2189                Ok(()) => {
2190                    self.stats.returned_row_expansions =
2191                        self.stats.returned_row_expansions.saturating_add(1);
2192                    rows.push(located);
2193                }
2194                Err(err) => {
2195                    self.partial = true;
2196                    self.file_errors.push(format!(
2197                        "{} cursor {}: {err}",
2198                        located.file_path.display(),
2199                        located.row.cursor
2200                    ));
2201                }
2202            }
2203        }
2204        self.rows = rows;
2205        self.stats.rows_returned = self.rows.len() as u64;
2206    }
2207
2208    fn merge_stats(&mut self, stats: ExplorerStats) {
2209        self.stats.rows_examined = self.stats.rows_examined.saturating_add(stats.rows_examined);
2210        self.stats.rows_matched = self.stats.rows_matched.saturating_add(stats.rows_matched);
2211        self.stats.facet_rows_matched = self
2212            .stats
2213            .facet_rows_matched
2214            .saturating_add(stats.facet_rows_matched);
2215        self.stats.rows_returned = self.stats.rows_returned.saturating_add(stats.rows_returned);
2216        self.stats.rows_unsampled = self
2217            .stats
2218            .rows_unsampled
2219            .saturating_add(stats.rows_unsampled);
2220        self.stats.rows_estimated = self
2221            .stats
2222            .rows_estimated
2223            .saturating_add(stats.rows_estimated);
2224        self.stats.sampling_sampled = self
2225            .stats
2226            .sampling_sampled
2227            .saturating_add(stats.sampling_sampled);
2228        self.stats.sampling_unsampled = self
2229            .stats
2230            .sampling_unsampled
2231            .saturating_add(stats.sampling_unsampled);
2232        self.stats.sampling_estimated = self
2233            .stats
2234            .sampling_estimated
2235            .saturating_add(stats.sampling_estimated);
2236        if stats.last_realtime_usec > self.stats.last_realtime_usec {
2237            self.stats.last_realtime_usec = stats.last_realtime_usec;
2238        }
2239        if stats.max_source_realtime_delta_usec > self.stats.max_source_realtime_delta_usec {
2240            self.stats.max_source_realtime_delta_usec = stats.max_source_realtime_delta_usec;
2241        }
2242        self.stats.data_refs_seen = self
2243            .stats
2244            .data_refs_seen
2245            .saturating_add(stats.data_refs_seen);
2246        self.stats.data_refs_skipped = self
2247            .stats
2248            .data_refs_skipped
2249            .saturating_add(stats.data_refs_skipped);
2250        self.stats.data_payloads_loaded = self
2251            .stats
2252            .data_payloads_loaded
2253            .saturating_add(stats.data_payloads_loaded);
2254        self.stats.data_objects_classified = self
2255            .stats
2256            .data_objects_classified
2257            .saturating_add(stats.data_objects_classified);
2258        self.stats.data_cache_hits = self
2259            .stats
2260            .data_cache_hits
2261            .saturating_add(stats.data_cache_hits);
2262        self.stats.data_cache_misses = self
2263            .stats
2264            .data_cache_misses
2265            .saturating_add(stats.data_cache_misses);
2266        self.stats.payloads_decompressed = self
2267            .stats
2268            .payloads_decompressed
2269            .saturating_add(stats.payloads_decompressed);
2270        self.stats.fts_scans = self.stats.fts_scans.saturating_add(stats.fts_scans);
2271        self.stats.facet_updates = self.stats.facet_updates.saturating_add(stats.facet_updates);
2272        self.stats.histogram_updates = self
2273            .stats
2274            .histogram_updates
2275            .saturating_add(stats.histogram_updates);
2276        self.stats.returned_row_expansions = self
2277            .stats
2278            .returned_row_expansions
2279            .saturating_add(stats.returned_row_expansions);
2280        self.stats.early_stop_opportunities = self
2281            .stats
2282            .early_stop_opportunities
2283            .saturating_add(stats.early_stop_opportunities);
2284        self.stats.early_stops = self.stats.early_stops.saturating_add(stats.early_stops);
2285    }
2286
2287    fn add_zero_count_facet_values(
2288        &mut self,
2289        vocabulary: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
2290    ) {
2291        for (field, values) in vocabulary {
2292            let target = self.facets.entry(field.clone()).or_default();
2293            for value in values.keys() {
2294                add_netdata_facet_count(target, value, 0);
2295            }
2296        }
2297    }
2298
2299    fn add_zero_count_facet_values_from_files(
2300        &mut self,
2301        fields: &[Vec<u8>],
2302        reader_options: ReaderOptions,
2303    ) {
2304        for path in &self.matched_paths {
2305            let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2306                continue;
2307            };
2308            for field in fields {
2309                let Ok(field_name) = std::str::from_utf8(field) else {
2310                    continue;
2311                };
2312                let Ok(values) = reader.query_unique(field_name) else {
2313                    continue;
2314                };
2315                if values.is_empty() {
2316                    continue;
2317                }
2318                let target = self.facets.entry(field.clone()).or_default();
2319                for value in values {
2320                    add_netdata_facet_count(target, &value, 0);
2321                }
2322            }
2323        }
2324    }
2325
2326    fn add_zero_count_selected_filter_values(&mut self, request: &NetdataRequest) {
2327        let mut report_fields: HashSet<Vec<u8>> = request.facets.iter().cloned().collect();
2328        if let Some(histogram) = &request.histogram {
2329            report_fields.insert(histogram.as_bytes().to_vec());
2330        }
2331        for filter in &request.filters {
2332            if !report_fields.contains(&filter.field) {
2333                continue;
2334            }
2335            let target = self.facets.entry(filter.field.clone()).or_default();
2336            for value in &filter.values {
2337                add_netdata_facet_count(target, value, 0);
2338            }
2339        }
2340    }
2341
2342    fn reportable_facet_fields(&self, requested: &[Vec<u8>]) -> Vec<String> {
2343        string_fields(&self.reportable_facet_fields_bytes(requested))
2344    }
2345
2346    fn reportable_facet_fields_bytes(&self, requested: &[Vec<u8>]) -> Vec<Vec<u8>> {
2347        let mut fields = Vec::new();
2348        for field in requested {
2349            if !fields.contains(field) {
2350                fields.push(field.clone());
2351            }
2352        }
2353        fields
2354    }
2355}
2356
2357fn netdata_function_error(status: u64, message: &str) -> Value {
2358    json!({
2359        "status": status,
2360        "errorMessage": message,
2361    })
2362}
2363
2364fn query_message(timed_out: bool, stats: &ExplorerStats) -> Value {
2365    if !timed_out && stats.rows_unsampled == 0 && stats.rows_estimated == 0 {
2366        return Value::String("OK".to_string());
2367    }
2368
2369    let total = stats
2370        .rows_examined
2371        .saturating_add(stats.rows_unsampled)
2372        .saturating_add(stats.rows_estimated)
2373        .max(1);
2374    let real_percent = stats.rows_examined as f64 * 100.0 / total as f64;
2375    let unsampled_percent = stats.rows_unsampled as f64 * 100.0 / total as f64;
2376    let estimated_percent = stats.rows_estimated as f64 * 100.0 / total as f64;
2377
2378    let mut title = String::new();
2379    let mut description = String::new();
2380    let mut status = "notice";
2381    if timed_out {
2382        title.push_str("Query timed-out, incomplete data. ");
2383        description.push_str(
2384            "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ",
2385        );
2386        status = "warning";
2387    }
2388    if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2389        title.push_str(&format!("{real_percent:.2}% real data"));
2390        description.push_str(&format!(
2391            "ACTUAL DATA: The filters counters reflect {real_percent:.2}% of the data. "
2392        ));
2393    }
2394    if stats.rows_unsampled != 0 {
2395        title.push_str(&format!(", {unsampled_percent:.2}% unsampled"));
2396        description.push_str(&format!(
2397            "UNSAMPLED DATA: {unsampled_percent:.2}% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
2398        ));
2399    }
2400    if stats.rows_estimated != 0 {
2401        title.push_str(&format!(", {estimated_percent:.2}% estimated"));
2402        description.push_str(&format!(
2403            "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by {estimated_percent:.2}%. "
2404        ));
2405    }
2406
2407    json!({
2408        "title": title,
2409        "status": status,
2410        "description": description,
2411    })
2412}
2413
2414fn merged_progress_stats(completed: &ExplorerStats, current: &ExplorerStats) -> ExplorerStats {
2415    let mut stats = completed.clone();
2416    stats.rows_examined = stats.rows_examined.saturating_add(current.rows_examined);
2417    stats.rows_matched = stats.rows_matched.saturating_add(current.rows_matched);
2418    stats.facet_rows_matched = stats
2419        .facet_rows_matched
2420        .saturating_add(current.facet_rows_matched);
2421    stats.rows_returned = stats.rows_returned.saturating_add(current.rows_returned);
2422    stats.rows_unsampled = stats.rows_unsampled.saturating_add(current.rows_unsampled);
2423    stats.rows_estimated = stats.rows_estimated.saturating_add(current.rows_estimated);
2424    stats.sampling_sampled = stats
2425        .sampling_sampled
2426        .saturating_add(current.sampling_sampled);
2427    stats.sampling_unsampled = stats
2428        .sampling_unsampled
2429        .saturating_add(current.sampling_unsampled);
2430    stats.sampling_estimated = stats
2431        .sampling_estimated
2432        .saturating_add(current.sampling_estimated);
2433    if current.last_realtime_usec > stats.last_realtime_usec {
2434        stats.last_realtime_usec = current.last_realtime_usec;
2435    }
2436    if current.max_source_realtime_delta_usec > stats.max_source_realtime_delta_usec {
2437        stats.max_source_realtime_delta_usec = current.max_source_realtime_delta_usec;
2438    }
2439    stats.data_refs_seen = stats.data_refs_seen.saturating_add(current.data_refs_seen);
2440    stats.data_refs_skipped = stats
2441        .data_refs_skipped
2442        .saturating_add(current.data_refs_skipped);
2443    stats.data_payloads_loaded = stats
2444        .data_payloads_loaded
2445        .saturating_add(current.data_payloads_loaded);
2446    stats.data_objects_classified = stats
2447        .data_objects_classified
2448        .saturating_add(current.data_objects_classified);
2449    stats.data_cache_hits = stats
2450        .data_cache_hits
2451        .saturating_add(current.data_cache_hits);
2452    stats.data_cache_misses = stats
2453        .data_cache_misses
2454        .saturating_add(current.data_cache_misses);
2455    stats.payloads_decompressed = stats
2456        .payloads_decompressed
2457        .saturating_add(current.payloads_decompressed);
2458    stats.fts_scans = stats.fts_scans.saturating_add(current.fts_scans);
2459    stats.facet_updates = stats.facet_updates.saturating_add(current.facet_updates);
2460    stats.histogram_updates = stats
2461        .histogram_updates
2462        .saturating_add(current.histogram_updates);
2463    stats.returned_row_expansions = stats
2464        .returned_row_expansions
2465        .saturating_add(current.returned_row_expansions);
2466    stats.early_stop_opportunities = stats
2467        .early_stop_opportunities
2468        .saturating_add(current.early_stop_opportunities);
2469    stats.early_stops = stats.early_stops.saturating_add(current.early_stops);
2470    stats
2471}
2472
2473struct Columns {
2474    order: Vec<String>,
2475    map: Map<String, Value>,
2476}
2477
2478struct QueryResponseArtifacts {
2479    reportable_facet_field_names: Vec<String>,
2480    columns: Columns,
2481    data: Vec<Value>,
2482    facets: Value,
2483    histogram: Option<Value>,
2484    message: Value,
2485    items: Value,
2486}
2487
2488fn base_query_response(
2489    request: &NetdataRequest,
2490    combined: &CombinedResult,
2491    artifacts: &QueryResponseArtifacts,
2492) -> Value {
2493    json!({
2494        "_request": &request.echo,
2495        "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
2496        "_journal_files": {
2497            "matched": combined.matched_files,
2498            "skipped": combined.skipped_files,
2499            "errors": &combined.file_errors,
2500        },
2501        "status": 200,
2502        "partial": combined.partial,
2503        "type": "table",
2504        "show_ids": true,
2505        "has_history": true,
2506        "pagination": {
2507            "enabled": true,
2508            "key": "anchor",
2509            "column": "timestamp",
2510            "units": "timestamp_usec",
2511        },
2512        "columns": &artifacts.columns.map,
2513        "data": &artifacts.data,
2514        "_stats": {
2515            "sdk_explorer": &combined.stats,
2516        },
2517        "expires": if request.data_only {
2518            unix_now_seconds().saturating_add(3600)
2519        } else {
2520            0
2521        }
2522    })
2523}
2524
2525fn response_items(request: &NetdataRequest, combined: &CombinedResult, returned: u64) -> Value {
2526    let unsampled = combined.stats.rows_unsampled;
2527    let estimated = combined.stats.rows_estimated;
2528    let fallback_rows_after_returned =
2529        response_fallback_rows_after_returned(&combined.stats, returned);
2530    let page_counters = combined
2531        .page_counters
2532        .unwrap_or_else(|| NetdataPageCounters {
2533            matched: combined.stats.rows_matched,
2534            before: 0,
2535            after: fallback_rows_after_returned,
2536        });
2537    json!({
2538        "evaluated": combined.stats.rows_examined.saturating_add(unsampled).saturating_add(estimated),
2539        "matched": page_counters.matched.saturating_add(unsampled).saturating_add(estimated),
2540        "unsampled": unsampled,
2541        "estimated": estimated,
2542        "returned": returned,
2543        "max_to_return": request.limit as u64,
2544        "before": page_counters.before,
2545        "after": page_counters.after,
2546    })
2547}
2548
2549fn response_fallback_rows_after_returned(stats: &ExplorerStats, returned: u64) -> u64 {
2550    let source_rows = if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2551        stats.rows_examined
2552    } else {
2553        stats.rows_matched
2554    };
2555    source_rows.saturating_sub(returned)
2556}
2557
2558fn add_last_modified_if_needed(
2559    object: &mut Map<String, Value>,
2560    request: &NetdataRequest,
2561    combined: &CombinedResult,
2562) {
2563    if !request.data_only || request.tail {
2564        object.insert(
2565            "last_modified".to_string(),
2566            Value::from(combined.stats.last_realtime_usec),
2567        );
2568    }
2569}
2570
2571fn add_sampling_if_needed(object: &mut Map<String, Value>, combined: &CombinedResult) {
2572    if combined.sampling_enabled {
2573        object.insert(
2574            "_sampling".to_string(),
2575            json!({
2576                "enabled": true,
2577                "sampled": combined.stats.sampling_sampled,
2578                "unsampled": combined.stats.sampling_unsampled,
2579                "estimated": combined.stats.sampling_estimated,
2580            }),
2581        );
2582    }
2583}
2584
2585fn add_analysis_outputs_if_needed(
2586    object: &mut Map<String, Value>,
2587    request: &NetdataRequest,
2588    artifacts: QueryResponseArtifacts,
2589) {
2590    if !request.data_only || request.delta {
2591        let (facets_key, histogram_key, items_key) = response_analysis_keys(request.data_only);
2592        object.insert(facets_key.to_string(), artifacts.facets);
2593        object.insert(
2594            histogram_key.to_string(),
2595            artifacts.histogram.unwrap_or(Value::Null),
2596        );
2597        object.insert(items_key.to_string(), artifacts.items);
2598    }
2599}
2600
2601fn response_analysis_keys(data_only: bool) -> (&'static str, &'static str, &'static str) {
2602    if data_only {
2603        ("facets_delta", "histogram_delta", "items_delta")
2604    } else {
2605        ("facets", "histogram", "items")
2606    }
2607}
2608
2609fn merge_histogram(
2610    target: &mut Option<ExplorerHistogram>,
2611    source: ExplorerHistogram,
2612) -> Result<()> {
2613    let Some(target) = target else {
2614        *target = Some(source);
2615        return Ok(());
2616    };
2617    if target.field != source.field
2618        || target.buckets.len() != source.buckets.len()
2619        || target
2620            .buckets
2621            .iter()
2622            .zip(source.buckets.iter())
2623            .any(|(target_bucket, source_bucket)| {
2624                target_bucket.start_realtime_usec != source_bucket.start_realtime_usec
2625                    || target_bucket.end_realtime_usec != source_bucket.end_realtime_usec
2626            })
2627    {
2628        return Err(SdkError::Unsupported(
2629            "inconsistent Netdata histogram bucket shape",
2630        ));
2631    }
2632    for (index, source_bucket) in source.buckets.into_iter().enumerate() {
2633        let Some(target_bucket) = target.buckets.get_mut(index) else {
2634            return Err(SdkError::Unsupported(
2635                "inconsistent Netdata histogram bucket shape",
2636            ));
2637        };
2638        for (value, count) in source_bucket.values {
2639            *target_bucket.values.entry(value).or_default() += count;
2640        }
2641    }
2642    Ok(())
2643}
2644
2645fn facet_group_is_reportable(values: &BTreeMap<Vec<u8>, u64>) -> bool {
2646    values
2647        .iter()
2648        .any(|(value, _count)| !value.is_empty() && value.as_slice() != b"-")
2649}
2650
2651fn netdata_facet_value(value: &[u8]) -> &[u8] {
2652    if value.len() > NETDATA_FACET_MAX_VALUE_LENGTH {
2653        &value[..NETDATA_FACET_MAX_VALUE_LENGTH]
2654    } else {
2655        value
2656    }
2657}
2658
2659fn add_netdata_facet_count(target: &mut BTreeMap<Vec<u8>, u64>, value: &[u8], count: u64) {
2660    *target
2661        .entry(netdata_facet_value(value).to_vec())
2662        .or_default() += count;
2663}
2664
2665fn not_modified_before_scan_response(
2666    request: &NetdataRequest,
2667    selected: &SelectedJournalFiles,
2668) -> Option<Value> {
2669    if request.if_modified_since_usec != 0 && !selected.files_are_newer {
2670        Some(netdata_function_error(
2671            304,
2672            "No new data since the previous call.",
2673        ))
2674    } else {
2675        None
2676    }
2677}
2678
2679fn should_collect_unfiltered_facet_vocabulary(
2680    request: &NetdataRequest,
2681    combined: &CombinedResult,
2682) -> bool {
2683    !request.data_only && !combined.partial && !request.filters.is_empty()
2684}
2685
2686fn progress_context(file_index: usize, total_files: usize, started: Instant) -> ProgressContext {
2687    ProgressContext {
2688        current_file: file_index + 1,
2689        total_files,
2690        started,
2691    }
2692}
2693
2694fn emit_netdata_progress(
2695    options: &mut NetdataFunctionRunOptions<'_>,
2696    progress: NetdataFunctionProgress,
2697) {
2698    if let Some(callback) = options.progress_callback.as_deref_mut() {
2699        callback(progress);
2700    }
2701}
2702
2703fn emit_progress_for_combined(
2704    options: &mut NetdataFunctionRunOptions<'_>,
2705    combined: &CombinedResult,
2706    context: ProgressContext,
2707) {
2708    emit_netdata_progress(
2709        options,
2710        NetdataFunctionProgress {
2711            current_file: context.current_file,
2712            total_files: context.total_files,
2713            matched_files: combined.matched_files,
2714            skipped_files: combined.skipped_files,
2715            stats: combined.stats.clone(),
2716            elapsed: context.started.elapsed(),
2717        },
2718    );
2719}
2720
2721fn emit_explorer_progress(
2722    options: &mut NetdataFunctionRunOptions<'_>,
2723    combined: &CombinedResult,
2724    progress: ExplorerProgress,
2725    context: ProgressContext,
2726) {
2727    let stats = merged_progress_stats(&combined.stats, &progress.stats);
2728    if let Some(callback) = options.progress_callback.as_deref_mut() {
2729        callback(NetdataFunctionProgress {
2730            current_file: context.current_file,
2731            total_files: context.total_files,
2732            matched_files: combined.matched_files,
2733            skipped_files: combined.skipped_files,
2734            stats,
2735            elapsed: context.started.elapsed(),
2736        });
2737    }
2738}
2739
2740fn request_cancelled(options: &NetdataFunctionRunOptions<'_>) -> bool {
2741    options
2742        .cancellation_callback
2743        .is_some_and(|is_cancelled| is_cancelled())
2744}
2745
2746fn should_stop_before_file(
2747    combined: &mut CombinedResult,
2748    deadline: Option<Instant>,
2749    options: &NetdataFunctionRunOptions<'_>,
2750) -> bool {
2751    if request_cancelled(options) {
2752        combined.partial = true;
2753        combined.cancelled = true;
2754        return true;
2755    }
2756    if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
2757        combined.partial = true;
2758        combined.timed_out = true;
2759        return true;
2760    }
2761    false
2762}
2763
2764fn collect_column_fields_for_file(
2765    reader: &mut FileReader,
2766    request: &NetdataRequest,
2767    path: &Path,
2768    combined: &mut CombinedResult,
2769) {
2770    if request.data_only {
2771        return;
2772    }
2773    match reader.enumerate_fields_indexed() {
2774        Ok(fields) => combined.add_column_fields(fields),
2775        Err(err) => combined.file_errors.push(format!(
2776            "{}: FIELD index enumeration failed: {err}",
2777            path.display()
2778        )),
2779    }
2780}
2781
2782fn record_explore_result(
2783    result: Result<(ExplorerResult, Option<ExplorerStopReason>)>,
2784    path: &Path,
2785    combined: &mut CombinedResult,
2786) -> Option<(ExplorerResult, Option<ExplorerStopReason>)> {
2787    match result {
2788        Ok(result) => Some(result),
2789        Err(err) => {
2790            combined.skipped_files = combined.skipped_files.saturating_add(1);
2791            combined
2792                .file_errors
2793                .push(format!("{}: {err}", path.display()));
2794            None
2795        }
2796    }
2797}
2798
2799fn delta_scan_can_stop(
2800    request: &NetdataRequest,
2801    page_window: &RefCell<NetdataPageWindow>,
2802    realtime_usec: u64,
2803    rows_matched: u64,
2804    realtime_delta_usec: u64,
2805) -> bool {
2806    let mut page_window = page_window.borrow_mut();
2807    page_window.observe(realtime_usec);
2808    request.data_only
2809        && request.delta
2810        && rows_matched % DATA_ONLY_CHECK_EVERY_ROWS == 0
2811        && page_window.can_stop_delta_file(realtime_usec, realtime_delta_usec)
2812}
2813
2814#[allow(clippy::too_many_arguments)]
2815fn finish_explored_file(
2816    options: &mut NetdataFunctionRunOptions<'_>,
2817    request: &NetdataRequest,
2818    file: &SelectedJournalFile,
2819    query: &ExplorerQuery,
2820    result: ExplorerResult,
2821    stop_reason: Option<ExplorerStopReason>,
2822    combined: &mut CombinedResult,
2823    files: &[SelectedJournalFile],
2824    file_index: usize,
2825    progress: ProgressContext,
2826) -> Result<bool> {
2827    update_learned_realtime_delta(options, &file.path, &file.order, &result.stats);
2828    combined.merge(&file.path, result, query.direction, request.limit)?;
2829    emit_progress_for_combined(options, combined, progress);
2830    if request_cancelled(options) {
2831        combined.partial = true;
2832        combined.cancelled = true;
2833        return Ok(true);
2834    }
2835    if let Some(reason) = stop_reason {
2836        combined.partial = true;
2837        match reason {
2838            ExplorerStopReason::TimedOut => combined.timed_out = true,
2839            ExplorerStopReason::Cancelled => combined.cancelled = true,
2840        }
2841        return Ok(true);
2842    }
2843    Ok(request.data_only
2844        && !request.delta
2845        && !request.tail
2846        && combined.rows.len() >= request.limit
2847        && remaining_files_cannot_affect_data_page(combined, request, files, file_index + 1))
2848}
2849
2850fn file_metadata(
2851    options: &NetdataFunctionRunOptions<'_>,
2852    path: &Path,
2853) -> Option<NetdataJournalFileMetadata> {
2854    options
2855        .state
2856        .as_deref()
2857        .and_then(|state| state.file_metadata(path))
2858}
2859
2860fn update_learned_realtime_delta(
2861    options: &mut NetdataFunctionRunOptions<'_>,
2862    path: &Path,
2863    order: &JournalFileOrderInfo,
2864    stats: &ExplorerStats,
2865) {
2866    let learned_delta = stats.max_source_realtime_delta_usec;
2867    if learned_delta == 0 || learned_delta <= order.journal_vs_realtime_delta_usec {
2868        return;
2869    }
2870    let learned_delta = normalize_journal_vs_realtime_delta_usec(learned_delta);
2871    if learned_delta <= order.journal_vs_realtime_delta_usec {
2872        return;
2873    }
2874    if let Some(state) = options.state.as_deref_mut() {
2875        state.update_file_journal_vs_realtime_delta_usec(path, learned_delta);
2876    }
2877}
2878
2879fn normalize_journal_vs_realtime_delta_usec(delta_usec: u64) -> u64 {
2880    delta_usec
2881        .max(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC)
2882        .min(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC)
2883}
2884
2885#[cfg(test)]
2886fn file_may_overlap_request(header: crate::FileHeader, request: &NetdataRequest) -> bool {
2887    if header.tail_entry_realtime == 0 {
2888        return true;
2889    }
2890
2891    let first = header
2892        .head_entry_realtime
2893        .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2894    let last = header
2895        .tail_entry_realtime
2896        .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2897
2898    if request
2899        .after_realtime_usec
2900        .is_some_and(|after| last < after)
2901    {
2902        return false;
2903    }
2904    if request
2905        .before_realtime_usec
2906        .is_some_and(|before| first > before)
2907    {
2908        return false;
2909    }
2910
2911    true
2912}
2913
2914#[derive(Debug)]
2915struct SelectedJournalFile {
2916    path: PathBuf,
2917    order: JournalFileOrderInfo,
2918}
2919
2920#[derive(Debug, Default)]
2921struct SelectedJournalFiles {
2922    files: Vec<SelectedJournalFile>,
2923    files_are_newer: bool,
2924}
2925
2926fn select_journal_files_for_request(
2927    paths: Vec<PathBuf>,
2928    request: &NetdataRequest,
2929    reader_options: ReaderOptions,
2930    options: &NetdataFunctionRunOptions<'_>,
2931) -> SelectedJournalFiles {
2932    let mut selected = Vec::new();
2933    for path in paths {
2934        let metadata = file_metadata(options, &path);
2935        if !request.matches_source(&path, metadata.as_ref()) {
2936            continue;
2937        }
2938        let order = journal_file_order_info(&path, reader_options, metadata.as_ref());
2939        if !journal_file_order_may_overlap_request(&order, request) {
2940            continue;
2941        }
2942        selected.push(SelectedJournalFile { path, order });
2943    }
2944    selected.sort_by(|left, right| {
2945        compare_journal_file_order(&left.order, &right.order, request.direction)
2946            .then_with(|| left.path.cmp(&right.path))
2947    });
2948    let files_are_newer = selected
2949        .iter()
2950        .any(|file| file.order.msg_last_realtime_usec > request.if_modified_since_usec);
2951    SelectedJournalFiles {
2952        files: selected,
2953        files_are_newer,
2954    }
2955}
2956
2957fn remaining_files_cannot_affect_data_page(
2958    combined: &CombinedResult,
2959    request: &NetdataRequest,
2960    files: &[SelectedJournalFile],
2961    next_file_index: usize,
2962) -> bool {
2963    let Some(next) = files.get(next_file_index) else {
2964        return true;
2965    };
2966    let slack = next.order.journal_vs_realtime_delta_usec;
2967    match request.direction {
2968        Direction::Backward => {
2969            let Some(oldest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).min()
2970            else {
2971                return false;
2972            };
2973            next.order.msg_last_realtime_usec < oldest_retained.saturating_sub(slack)
2974        }
2975        Direction::Forward => {
2976            let Some(newest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).max()
2977            else {
2978                return false;
2979            };
2980            next.order.msg_first_realtime_usec > newest_retained.saturating_add(slack)
2981        }
2982    }
2983}
2984
2985fn journal_file_order_may_overlap_request(
2986    info: &JournalFileOrderInfo,
2987    request: &NetdataRequest,
2988) -> bool {
2989    if info.msg_last_realtime_usec == 0 {
2990        return true;
2991    }
2992
2993    let first = info
2994        .msg_first_realtime_usec
2995        .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2996    let last = info
2997        .msg_last_realtime_usec
2998        .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2999
3000    if request
3001        .after_realtime_usec
3002        .is_some_and(|after| last < after)
3003    {
3004        return false;
3005    }
3006    if request
3007        .before_realtime_usec
3008        .is_some_and(|before| first > before)
3009    {
3010        return false;
3011    }
3012
3013    true
3014}
3015
3016fn collect_boot_first_realtime(
3017    paths: &[PathBuf],
3018    reader_options: ReaderOptions,
3019    needed_boot_ids: &BTreeSet<Vec<u8>>,
3020) -> BTreeMap<Vec<u8>, u64> {
3021    let mut out = BTreeMap::new();
3022    if needed_boot_ids.is_empty() {
3023        return out;
3024    }
3025    for path in paths {
3026        let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
3027            continue;
3028        };
3029        let Ok(boot_ids) = reader.query_unique("_BOOT_ID") else {
3030            continue;
3031        };
3032        for boot_id in boot_ids {
3033            if !needed_boot_ids.contains(&boot_id) {
3034                continue;
3035            }
3036            let mut match_payload = b"_BOOT_ID=".to_vec();
3037            match_payload.extend_from_slice(&boot_id);
3038            reader.flush_matches();
3039            reader.add_match(&match_payload);
3040            reader.seek_head();
3041            if !reader.next().unwrap_or(false) {
3042                continue;
3043            }
3044            if let Ok(realtime) = reader.get_realtime_usec() {
3045                record_boot_first_realtime(&mut out, boot_id, realtime);
3046            }
3047        }
3048        reader.flush_matches();
3049    }
3050    out
3051}
3052
3053fn response_boot_ids(
3054    column_order: &[String],
3055    rows: &[LocatedRow],
3056    facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
3057    histogram: Option<&ExplorerHistogram>,
3058) -> BTreeSet<Vec<u8>> {
3059    let mut boot_ids = BTreeSet::new();
3060    let row_needs_boot_id = column_order.iter().any(|field| field == "_BOOT_ID");
3061    if row_needs_boot_id {
3062        for row in rows {
3063            if let Some(values) = row_fields(row).get("_BOOT_ID") {
3064                boot_ids.extend(values.iter().cloned());
3065            }
3066        }
3067    }
3068    if let Some(values) = facets.get(b"_BOOT_ID".as_slice()) {
3069        boot_ids.extend(
3070            values
3071                .keys()
3072                .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3073                .cloned(),
3074        );
3075    }
3076    if let Some(histogram) = histogram.filter(|histogram| histogram.field == b"_BOOT_ID") {
3077        for bucket in &histogram.buckets {
3078            boot_ids.extend(
3079                bucket
3080                    .values
3081                    .keys()
3082                    .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3083                    .cloned(),
3084            );
3085        }
3086    }
3087    boot_ids
3088}
3089
3090fn record_boot_first_realtime(
3091    target: &mut BTreeMap<Vec<u8>, u64>,
3092    boot_id: Vec<u8>,
3093    realtime_usec: u64,
3094) {
3095    let existing = target.entry(boot_id).or_insert(realtime_usec);
3096    if realtime_usec < *existing {
3097        *existing = realtime_usec;
3098    }
3099}
3100
3101fn row_fields(row: &LocatedRow) -> BTreeMap<String, Vec<Vec<u8>>> {
3102    let mut fields = BTreeMap::new();
3103    for payload in &row.row.payloads {
3104        let Some((field, value)) = split_payload(payload) else {
3105            continue;
3106        };
3107        fields
3108            .entry(String::from_utf8_lossy(field).into_owned())
3109            .or_insert_with(Vec::new)
3110            .push(value.to_vec());
3111    }
3112    fields.insert(
3113        "ND_JOURNAL_FILE".to_string(),
3114        vec![row.file_path.display().to_string().into_bytes()],
3115    );
3116    if !fields.contains_key("ND_JOURNAL_PROCESS") {
3117        let process = dynamic_process_name(&fields);
3118        if !process.is_empty() {
3119            fields.insert("ND_JOURNAL_PROCESS".to_string(), vec![process.into_bytes()]);
3120        }
3121    }
3122    fields
3123}
3124
3125fn dynamic_process_name(fields: &BTreeMap<String, Vec<Vec<u8>>>) -> String {
3126    let base = first_value(fields, "CONTAINER_NAME")
3127        .or_else(|| first_value(fields, "SYSLOG_IDENTIFIER"))
3128        .or_else(|| first_value(fields, "_COMM"))
3129        .map(|value| String::from_utf8_lossy(value).into_owned())
3130        .unwrap_or_default();
3131    if base.is_empty() {
3132        return "-".to_string();
3133    }
3134    match first_value(fields, "_PID") {
3135        Some(pid) if !pid.is_empty() => {
3136            let pid = String::from_utf8_lossy(pid);
3137            format!("{base}[{pid}]")
3138        }
3139        Some(_) => base,
3140        None => format!("{base}[-]"),
3141    }
3142}
3143
3144fn first_value<'a>(fields: &'a BTreeMap<String, Vec<Vec<u8>>>, field: &str) -> Option<&'a [u8]> {
3145    fields
3146        .get(field)
3147        .and_then(|values| values.first())
3148        .map(Vec::as_slice)
3149}
3150
3151fn split_payload(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3152    let split = payload.iter().position(|byte| *byte == b'=')?;
3153    Some((&payload[..split], &payload[split + 1..]))
3154}
3155
3156fn make_row_timestamps_unique(rows: &mut [LocatedRow], direction: Direction) {
3157    let mut last_from = 0u64;
3158    let mut last_to = 0u64;
3159    let mut initialized = false;
3160    for row in rows {
3161        let timestamp = row.row.realtime_usec;
3162        if initialized && timestamp >= last_from && timestamp <= last_to {
3163            match direction {
3164                Direction::Backward => {
3165                    last_from = last_from.saturating_sub(1);
3166                    row.row.realtime_usec = last_from;
3167                }
3168                Direction::Forward => {
3169                    last_to = last_to.saturating_add(1);
3170                    row.row.realtime_usec = last_to;
3171                }
3172            }
3173        } else {
3174            last_from = timestamp;
3175            last_to = timestamp;
3176            initialized = true;
3177        }
3178    }
3179}
3180
3181fn column_metadata(key: &str, index: usize) -> Value {
3182    let (visible, filter, full_width) = match key {
3183        "timestamp" => (true, "range", false),
3184        "rowOptions" => (false, "none", false),
3185        "_HOSTNAME" => (true, "facet", false),
3186        "ND_JOURNAL_PROCESS" | "MESSAGE" => (true, "none", key == "MESSAGE"),
3187        "ND_JOURNAL_FILE" | "_SOURCE_REALTIME_TIMESTAMP" => (false, "none", false),
3188        _ if systemd_column_is_facet(key) => (false, "facet", false),
3189        _ => (false, "none", false),
3190    };
3191    let column_type = if key == "timestamp" {
3192        "timestamp"
3193    } else if key == "rowOptions" {
3194        "none"
3195    } else {
3196        "string"
3197    };
3198    let visualization = if key == "rowOptions" {
3199        "rowOptions"
3200    } else {
3201        "value"
3202    };
3203    let mut metadata = json!({
3204        "index": index,
3205        "unique_key": key == "timestamp",
3206        "name": if key == "timestamp" { "Timestamp" } else { key },
3207        "visible": visible,
3208        "type": column_type,
3209        "visualization": visualization,
3210        "value_options": {
3211            "transform": if key == "timestamp" { "datetime_usec" } else { "none" },
3212            "decimal_points": 0,
3213            "default_value": if key == "timestamp" || key == "rowOptions" {
3214                Value::Null
3215            } else {
3216                Value::String("-".to_string())
3217            },
3218        },
3219        "sort": "ascending",
3220        "sortable": false,
3221        "sticky": false,
3222        "summary": "count",
3223        "filter": filter,
3224        "full_width": full_width,
3225        "wrap": key != "rowOptions",
3226        "default_expanded_filter": matches!(key, "PRIORITY" | "SYSLOG_FACILITY" | "MESSAGE_ID"),
3227    });
3228    if key == "rowOptions" {
3229        if let Some(object) = metadata.as_object_mut() {
3230            object.insert("dummy".to_string(), Value::Bool(true));
3231        }
3232    }
3233    metadata
3234}
3235
3236fn systemd_column_is_facet(key: &str) -> bool {
3237    if key == "MESSAGE_ID" {
3238        return true;
3239    }
3240    if key.contains("MESSAGE") || key.contains("TIMESTAMP") || key.starts_with("__") {
3241        return false;
3242    }
3243    true
3244}
3245
3246fn sort_facet_options(field: &str, options: &mut [Value]) {
3247    options.sort_by(|left, right| {
3248        let left_id = left.get("id").and_then(Value::as_str).unwrap_or_default();
3249        let right_id = right.get("id").and_then(Value::as_str).unwrap_or_default();
3250        if field == "PRIORITY" {
3251            return parse_priority(left_id).cmp(&parse_priority(right_id));
3252        }
3253        let left_count = left
3254            .get("count")
3255            .and_then(Value::as_u64)
3256            .unwrap_or_default();
3257        let right_count = right
3258            .get("count")
3259            .and_then(Value::as_u64)
3260            .unwrap_or_default();
3261        right_count
3262            .cmp(&left_count)
3263            .then_with(|| left_id.cmp(right_id))
3264    });
3265}
3266
3267fn parse_fts_query_patterns(query: &str) -> (Vec<ExplorerFtsPattern>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
3268    let bytes = query.as_bytes();
3269    let mut index = 0usize;
3270    let mut terms = Vec::new();
3271    let mut positives = Vec::new();
3272    let mut negatives = Vec::new();
3273
3274    while let Some((pattern, negative)) = next_fts_pattern(bytes, &mut index) {
3275        push_fts_pattern(
3276            pattern,
3277            negative,
3278            &mut terms,
3279            &mut positives,
3280            &mut negatives,
3281        );
3282    }
3283
3284    (terms, positives, negatives)
3285}
3286
3287fn next_fts_pattern(bytes: &[u8], index: &mut usize) -> Option<(Vec<u8>, bool)> {
3288    while *index < bytes.len() {
3289        skip_fts_separators(bytes, index);
3290        let negative = consume_fts_negative_marker(bytes, index);
3291        let pattern = read_fts_pattern(bytes, index);
3292        if !pattern.is_empty() {
3293            return Some((pattern, negative));
3294        }
3295    }
3296    None
3297}
3298
3299fn skip_fts_separators(bytes: &[u8], index: &mut usize) {
3300    while *index < bytes.len() && bytes[*index] == b'|' {
3301        *index += 1;
3302    }
3303}
3304
3305fn consume_fts_negative_marker(bytes: &[u8], index: &mut usize) -> bool {
3306    if bytes.get(*index) == Some(&b'!') {
3307        *index += 1;
3308        true
3309    } else {
3310        false
3311    }
3312}
3313
3314fn read_fts_pattern(bytes: &[u8], index: &mut usize) -> Vec<u8> {
3315    let mut pattern = Vec::new();
3316    let mut escaped = false;
3317    while *index < bytes.len() {
3318        let byte = bytes[*index];
3319        *index += 1;
3320        if byte == b'\\' && !escaped {
3321            escaped = true;
3322            continue;
3323        }
3324        if byte == b'|' && !escaped {
3325            break;
3326        }
3327        pattern.push(byte);
3328        escaped = false;
3329    }
3330    pattern
3331}
3332
3333fn push_fts_pattern(
3334    pattern: Vec<u8>,
3335    negative: bool,
3336    terms: &mut Vec<ExplorerFtsPattern>,
3337    positives: &mut Vec<Vec<u8>>,
3338    negatives: &mut Vec<Vec<u8>>,
3339) {
3340    terms.push(ExplorerFtsPattern::substring(pattern.clone(), negative));
3341    if negative {
3342        negatives.push(pattern);
3343    } else {
3344        positives.push(pattern);
3345    }
3346}
3347
3348fn parse_filters(value: Option<&Value>) -> Vec<ExplorerFilter> {
3349    let Some(Value::Object(selections)) = value else {
3350        return Vec::new();
3351    };
3352    let mut filters = Vec::new();
3353    for (field, values) in selections {
3354        if matches!(field.as_str(), "query" | "source" | "__logs_sources") {
3355            continue;
3356        }
3357        let Some(values) = parse_string_array(Some(values)) else {
3358            continue;
3359        };
3360        filters.push(ExplorerFilter::new(
3361            field.as_bytes().to_vec(),
3362            values
3363                .into_iter()
3364                .map(|value| normalize_filter_value(field, &value)),
3365        ));
3366    }
3367    filters
3368}
3369
3370#[derive(Debug, Clone)]
3371struct SourceSelection {
3372    source_type: u64,
3373    exact_sources: Vec<String>,
3374}
3375
3376fn parse_source_selection(value: Option<&Value>) -> SourceSelection {
3377    let mut selection = SourceSelection {
3378        source_type: SOURCE_TYPE_ALL,
3379        exact_sources: Vec::new(),
3380    };
3381    let Some(Value::Object(selections)) = value else {
3382        return selection;
3383    };
3384    let Some(values) = parse_string_array(selections.get("__logs_sources")) else {
3385        return selection;
3386    };
3387    selection.source_type = 0;
3388    for value in values {
3389        match source_type_for_name(&value) {
3390            Some(source_type) => selection.source_type |= source_type,
3391            None => selection.exact_sources.push(value),
3392        }
3393    }
3394    selection
3395}
3396
3397fn source_type_for_name(value: &str) -> Option<u64> {
3398    match value {
3399        "all" => Some(SOURCE_TYPE_ALL),
3400        "all-local-logs" => Some(SOURCE_TYPE_LOCAL_ALL),
3401        "all-remote-systems" => Some(SOURCE_TYPE_REMOTE_ALL),
3402        "all-local-system-logs" => Some(SOURCE_TYPE_LOCAL_SYSTEM),
3403        "all-local-user-logs" => Some(SOURCE_TYPE_LOCAL_USER),
3404        "all-local-namespaces" => Some(SOURCE_TYPE_LOCAL_NAMESPACE),
3405        "all-uncategorized" => Some(SOURCE_TYPE_LOCAL_OTHER),
3406        _ => None,
3407    }
3408}
3409
3410fn journal_file_source_type(path: &Path) -> u64 {
3411    let text = path.to_string_lossy();
3412    let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
3413        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER;
3414    };
3415    if text.contains("/remote/") {
3416        return SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL;
3417    }
3418    if local_namespace_source_name(path).is_some() {
3419        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE;
3420    }
3421    if name.starts_with("system") {
3422        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM;
3423    }
3424    if name.starts_with("user") {
3425        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER;
3426    }
3427    SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
3428}
3429
3430fn local_namespace_source_name(path: &Path) -> Option<String> {
3431    let parent = path.parent()?.file_name()?.to_str()?;
3432    let (_, namespace) = parent.rsplit_once('.')?;
3433    (!namespace.is_empty()).then(|| format!("namespace-{namespace}"))
3434}
3435
3436fn journal_file_exact_source_name(path: &Path) -> Option<String> {
3437    let text = path.to_string_lossy();
3438    if text.contains("/remote/") {
3439        let name = path.file_name()?.to_str()?;
3440        let source = name
3441            .split_once('@')
3442            .map(|(prefix, _)| prefix)
3443            .unwrap_or_else(|| {
3444                name.strip_suffix(".journal~.zst")
3445                    .or_else(|| name.strip_suffix(".journal.zst"))
3446                    .or_else(|| name.strip_suffix(".journal~"))
3447                    .or_else(|| name.strip_suffix(".journal"))
3448                    .unwrap_or(name)
3449            });
3450        return source.starts_with("remote-").then(|| source.to_string());
3451    }
3452    local_namespace_source_name(path)
3453}
3454
3455fn normalize_filter_value(field: &str, value: &str) -> Vec<u8> {
3456    if field == "PRIORITY" {
3457        if let Some(priority) = priority_name_to_number(value) {
3458            return priority.as_bytes().to_vec();
3459        }
3460    }
3461    value.as_bytes().to_vec()
3462}
3463
3464fn parse_string_array(value: Option<&Value>) -> Option<Vec<String>> {
3465    let Value::Array(items) = value? else {
3466        return None;
3467    };
3468    Some(
3469        items
3470            .iter()
3471            .filter_map(Value::as_str)
3472            .map(ToOwned::to_owned)
3473            .collect(),
3474    )
3475}
3476
3477fn request_direction(object: &Map<String, Value>) -> Direction {
3478    match get_str(object, "direction").unwrap_or("backward") {
3479        "forward" | "forwards" | "next" => Direction::Forward,
3480        _ => Direction::Backward,
3481    }
3482}
3483
3484fn request_delta(data_only: bool, object: &Map<String, Value>) -> bool {
3485    data_only && get_bool(object, "delta").unwrap_or(false)
3486}
3487
3488fn request_tail(data_only: bool, if_modified_since_usec: u64, object: &Map<String, Value>) -> bool {
3489    data_only && if_modified_since_usec != 0 && get_bool(object, "tail").unwrap_or(false)
3490}
3491
3492fn tail_after_realtime_bound(
3493    after_realtime_usec: Option<u64>,
3494    anchor: ExplorerAnchor,
3495) -> Option<u64> {
3496    let ExplorerAnchor::Realtime(anchor) = anchor else {
3497        return after_realtime_usec;
3498    };
3499    let tail_after = anchor.saturating_add(1);
3500    Some(
3501        after_realtime_usec
3502            .map(|after| after.max(tail_after))
3503            .unwrap_or(tail_after),
3504    )
3505}
3506
3507fn before_realtime_bound_excluding_anchor(
3508    before_realtime_usec: Option<u64>,
3509    anchor: ExplorerAnchor,
3510) -> Option<u64> {
3511    let ExplorerAnchor::Realtime(anchor) = anchor else {
3512        return before_realtime_usec;
3513    };
3514    let before_anchor = anchor.saturating_sub(1);
3515    Some(
3516        before_realtime_usec
3517            .map(|before| before.min(before_anchor))
3518            .unwrap_or(before_anchor),
3519    )
3520}
3521
3522fn request_anchor_and_direction(
3523    object: &Map<String, Value>,
3524    tail: bool,
3525    direction: Direction,
3526    after_realtime_usec: Option<u64>,
3527    before_realtime_usec: Option<u64>,
3528) -> (ExplorerAnchor, Direction) {
3529    let anchor = get_u64(object, "anchor")
3530        .filter(|value| *value != 0)
3531        .map(normalize_timestamp_to_usec)
3532        .map(ExplorerAnchor::Realtime)
3533        .unwrap_or(ExplorerAnchor::Auto);
3534    if tail && matches!(anchor, ExplorerAnchor::Realtime(_)) {
3535        return (anchor, Direction::Backward);
3536    }
3537    if anchor_outside_window(anchor, after_realtime_usec, before_realtime_usec) {
3538        (ExplorerAnchor::Auto, Direction::Backward)
3539    } else {
3540        (anchor, direction)
3541    }
3542}
3543
3544fn anchor_outside_window(
3545    anchor: ExplorerAnchor,
3546    after_realtime_usec: Option<u64>,
3547    before_realtime_usec: Option<u64>,
3548) -> bool {
3549    let ExplorerAnchor::Realtime(anchor_usec) = anchor else {
3550        return false;
3551    };
3552    after_realtime_usec.is_some_and(|after| anchor_usec < after)
3553        || before_realtime_usec.is_some_and(|before| anchor_usec > before)
3554}
3555
3556fn request_limit(object: &Map<String, Value>) -> usize {
3557    get_u64(object, "last")
3558        .filter(|value| *value != 0)
3559        .map(|value| value as usize)
3560        .unwrap_or(DEFAULT_ITEMS_TO_RETURN)
3561}
3562
3563fn request_facets(
3564    requested_facets: &Option<Vec<String>>,
3565    config: &NetdataFunctionConfig,
3566) -> Vec<Vec<u8>> {
3567    requested_facets
3568        .clone()
3569        .unwrap_or_else(|| config.default_facets.clone())
3570        .into_iter()
3571        .map(Vec::from)
3572        .collect()
3573}
3574
3575fn request_histogram(object: &Map<String, Value>) -> Option<String> {
3576    get_str(object, "histogram")
3577        .filter(|histogram| !histogram.is_empty())
3578        .map(ToOwned::to_owned)
3579}
3580
3581fn request_histogram_or_default(
3582    requested_histogram: &Option<String>,
3583    config: &NetdataFunctionConfig,
3584) -> Option<String> {
3585    requested_histogram
3586        .clone()
3587        .or_else(|| config.default_histogram.clone())
3588}
3589
3590fn request_query(object: &Map<String, Value>) -> Option<String> {
3591    get_str(object, "query")
3592        .filter(|query| !query.is_empty())
3593        .map(ToOwned::to_owned)
3594}
3595
3596fn get_bool(object: &Map<String, Value>, key: &str) -> Option<bool> {
3597    object.get(key).and_then(Value::as_bool)
3598}
3599
3600fn get_i64(object: &Map<String, Value>, key: &str) -> Option<i64> {
3601    object.get(key).and_then(Value::as_i64)
3602}
3603
3604fn get_u64(object: &Map<String, Value>, key: &str) -> Option<u64> {
3605    object.get(key).and_then(Value::as_u64)
3606}
3607
3608fn get_str<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
3609    object.get(key).and_then(Value::as_str)
3610}
3611
3612fn normalize_time_window(
3613    now_seconds: i64,
3614    after: Option<i64>,
3615    before: Option<i64>,
3616) -> (Option<u64>, Option<u64>) {
3617    let mut after = after.unwrap_or(0);
3618    let mut before = before.unwrap_or(0);
3619
3620    if after == 0 && before == 0 {
3621        before = now_seconds;
3622        after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3623    } else {
3624        (after, before) = relative_window_to_absolute(now_seconds, after, before);
3625    }
3626
3627    if after > before {
3628        std::mem::swap(&mut after, &mut before);
3629    }
3630    if after == before {
3631        after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3632    }
3633
3634    (
3635        Some(normalize_timestamp_to_usec_with_rounding(
3636            after.max(0) as u64,
3637            false,
3638        )),
3639        Some(normalize_timestamp_to_usec_with_rounding(
3640            before.max(0) as u64,
3641            true,
3642        )),
3643    )
3644}
3645
3646fn relative_window_to_absolute(now_seconds: i64, after: i64, before: i64) -> (i64, i64) {
3647    let mut after = after;
3648    let mut before = before;
3649
3650    if before.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3651        if before > 0 {
3652            before = -before;
3653        }
3654        before = now_seconds.saturating_add(before);
3655    }
3656
3657    if after.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3658        if after > 0 {
3659            after = -after;
3660        }
3661        if after == 0 {
3662            after = -NETDATA_MISSING_AFTER_RELATIVE_SECONDS;
3663        }
3664        after = before.saturating_add(after).saturating_add(1);
3665    }
3666
3667    if after > before {
3668        std::mem::swap(&mut after, &mut before);
3669    }
3670
3671    if before > now_seconds {
3672        let delta = before.saturating_sub(now_seconds);
3673        before = before.saturating_sub(delta);
3674        after = after.saturating_sub(delta);
3675    }
3676
3677    (after, before)
3678}
3679
3680struct RequestEchoInput<'a> {
3681    info: bool,
3682    after_realtime_usec: Option<u64>,
3683    before_realtime_usec: Option<u64>,
3684    if_modified_since_usec: u64,
3685    anchor: ExplorerAnchor,
3686    direction: Direction,
3687    limit: usize,
3688    data_only: bool,
3689    delta: bool,
3690    tail: bool,
3691    sampling: u64,
3692    source_type: u64,
3693    requested_facets: Option<&'a [String]>,
3694    selections: Option<&'a Value>,
3695    histogram: Option<&'a str>,
3696    query: Option<&'a str>,
3697}
3698
3699fn normalized_request_echo(input: &RequestEchoInput<'_>) -> Value {
3700    let anchor_usec = match input.anchor {
3701        ExplorerAnchor::Realtime(usec) => usec,
3702        ExplorerAnchor::Auto | ExplorerAnchor::Head | ExplorerAnchor::Tail => 0,
3703    };
3704    let mut out = json!({
3705        "info": input.info,
3706        // The SDK Netdata boundary always uses indexed slice semantics. The
3707        // field remains in the echo because it is part of the plugin request
3708        // shape and downstream fixtures compare normalized requests.
3709        "slice": true,
3710        "data_only": input.data_only,
3711        "delta": input.delta,
3712        "tail": input.tail,
3713        "sampling": input.sampling,
3714        "source_type": input.source_type,
3715        "after": input.after_realtime_usec.unwrap_or(0) / 1_000_000,
3716        "before": input.before_realtime_usec.unwrap_or(0) / 1_000_000,
3717        "if_modified_since": input.if_modified_since_usec,
3718        "anchor": anchor_usec,
3719        "direction": match input.direction {
3720            Direction::Forward => "forward",
3721            Direction::Backward => "backward",
3722        },
3723        "last": input.limit,
3724        "query": input.query,
3725        "histogram": input.histogram,
3726    });
3727    if let Some(facets) = input.requested_facets {
3728        if let Some(object) = out.as_object_mut() {
3729            object.insert(
3730                "facets".to_string(),
3731                facets
3732                    .iter()
3733                    .map(|field| Value::String(field.clone()))
3734                    .collect(),
3735            );
3736        }
3737    }
3738    if let Some(Value::Object(selections)) = input.selections {
3739        let mut selections = selections.clone();
3740        if let Some(Value::Array(sources)) = selections.get_mut("__logs_sources") {
3741            for source in sources {
3742                *source = Value::Null;
3743            }
3744        }
3745        if let Some(object) = out.as_object_mut() {
3746            object.insert("selections".to_string(), Value::Object(selections));
3747        }
3748    }
3749    out
3750}
3751
3752fn normalize_timestamp_to_usec(value: u64) -> u64 {
3753    normalize_timestamp_to_usec_with_rounding(value, false)
3754}
3755
3756fn normalize_timestamp_to_usec_with_rounding(value: u64, end_of_second: bool) -> u64 {
3757    if value >= 1_000_000_000_000 {
3758        value
3759    } else if end_of_second {
3760        value.saturating_mul(1_000_000).saturating_add(999_999)
3761    } else {
3762        value.saturating_mul(1_000_000)
3763    }
3764}
3765
3766fn unix_now_seconds() -> i64 {
3767    SystemTime::now()
3768        .duration_since(UNIX_EPOCH)
3769        .map(|duration| duration.as_secs() as i64)
3770        .unwrap_or_default()
3771}
3772
3773fn human_binary_size(bytes: u64) -> String {
3774    const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
3775    let mut value = bytes as f64;
3776    let mut unit = 0usize;
3777    while value >= 1024.0 && unit + 1 < UNITS.len() {
3778        value /= 1024.0;
3779        unit += 1;
3780    }
3781    if unit == 0 {
3782        format!("{}{}", bytes, UNITS[unit])
3783    } else if value.fract() == 0.0 {
3784        format!("{value:.0}{}", UNITS[unit])
3785    } else {
3786        let mut formatted = format!("{value:.2}");
3787        while formatted.contains('.') && formatted.ends_with('0') {
3788            formatted.pop();
3789        }
3790        if formatted.ends_with('.') {
3791            formatted.pop();
3792        }
3793        format!("{formatted}{}", UNITS[unit])
3794    }
3795}
3796
3797fn human_duration_seconds(seconds: u64) -> String {
3798    let years = seconds / (365 * 86_400);
3799    let seconds = seconds % (365 * 86_400);
3800    let months = seconds / (30 * 86_400);
3801    let seconds = seconds % (30 * 86_400);
3802    let days = seconds / 86_400;
3803    let seconds = seconds % 86_400;
3804    let hours = seconds / 3600;
3805    let minutes = (seconds % 3600) / 60;
3806    let seconds = seconds % 60;
3807    let mut parts = Vec::new();
3808    if years != 0 {
3809        parts.push(format!("{years}y"));
3810    }
3811    if months != 0 {
3812        parts.push(format!("{months}mo"));
3813    }
3814    if days != 0 {
3815        parts.push(format!("{days}d"));
3816    }
3817    if hours != 0 {
3818        parts.push(format!("{hours}h"));
3819    }
3820    if minutes != 0 {
3821        parts.push(format!("{minutes}m"));
3822    }
3823    if seconds != 0 || parts.is_empty() {
3824        parts.push(format!("{seconds}s"));
3825    }
3826    parts.join(" ")
3827}
3828
3829#[derive(Debug, Default)]
3830struct JournalFileCollection {
3831    files: Vec<PathBuf>,
3832    skipped: u64,
3833    errors: Vec<String>,
3834}
3835
3836fn collect_journal_files(path: &Path) -> Result<JournalFileCollection> {
3837    if !path.is_dir() {
3838        return Err(SdkError::InvalidPath(format!(
3839            "not a directory: {}",
3840            path.display()
3841        )));
3842    }
3843    let mut collection = JournalFileCollection::default();
3844    let mut pending = VecDeque::from([(path.to_path_buf(), 0usize)]);
3845    let mut visited = HashSet::new();
3846    while let Some((directory, depth)) = pending.pop_front() {
3847        let visited_key = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
3848        if visited.contains(&visited_key) {
3849            continue;
3850        }
3851        if visited.len() >= NETDATA_MAX_DIRECTORY_SCAN_COUNT {
3852            collection.skipped = collection.skipped.saturating_add(1);
3853            collection.errors.push(format!(
3854                "{}: directory scan limit reached",
3855                directory.display()
3856            ));
3857            continue;
3858        }
3859        visited.insert(visited_key);
3860        let entries = match std::fs::read_dir(&directory) {
3861            Ok(entries) => entries,
3862            Err(err) if directory == path => return Err(err.into()),
3863            Err(err) => {
3864                collection.skipped = collection.skipped.saturating_add(1);
3865                collection
3866                    .errors
3867                    .push(format!("{}: {err}", directory.display()));
3868                continue;
3869            }
3870        };
3871        for entry in entries.flatten() {
3872            let entry_path = entry.path();
3873            if entry_path.is_file() && is_journal_file_name(&entry_path) {
3874                collection.files.push(entry_path);
3875            } else if depth < NETDATA_MAX_DIRECTORY_SCAN_DEPTH && entry_path.is_dir() {
3876                pending.push_back((entry_path, depth + 1));
3877            }
3878        }
3879    }
3880    collection.files.sort();
3881    dedup_journal_files_by_canonical_path(&mut collection.files);
3882    Ok(collection)
3883}
3884
3885fn dedup_journal_files_by_canonical_path(files: &mut Vec<PathBuf>) {
3886    let mut seen = HashSet::new();
3887    files.retain(|path| {
3888        let key = std::fs::canonicalize(path).unwrap_or_else(|_| path.clone());
3889        seen.insert(key)
3890    });
3891}
3892
3893#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3894struct JournalFileOrderInfo {
3895    msg_first_realtime_usec: u64,
3896    msg_last_realtime_usec: u64,
3897    file_last_modified_usec: u64,
3898    journal_vs_realtime_delta_usec: u64,
3899}
3900
3901fn journal_file_order_info(
3902    path: &Path,
3903    reader_options: ReaderOptions,
3904    metadata: Option<&NetdataJournalFileMetadata>,
3905) -> JournalFileOrderInfo {
3906    let file_last_modified_usec = std::fs::metadata(path)
3907        .ok()
3908        .and_then(|metadata| metadata.modified().ok())
3909        .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
3910        .map(|duration| duration.as_micros().min(u128::from(u64::MAX)) as u64)
3911        .unwrap_or_default();
3912    let file_last_modified_usec = metadata
3913        .and_then(|metadata| metadata.file_last_modified_usec)
3914        .unwrap_or(file_last_modified_usec);
3915    let journal_vs_realtime_delta_usec = metadata
3916        .and_then(|metadata| metadata.journal_vs_realtime_delta_usec)
3917        .map(normalize_journal_vs_realtime_delta_usec)
3918        .unwrap_or(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
3919
3920    let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
3921        return JournalFileOrderInfo {
3922            msg_first_realtime_usec: 0,
3923            msg_last_realtime_usec: file_last_modified_usec,
3924            file_last_modified_usec,
3925            journal_vs_realtime_delta_usec,
3926        };
3927    };
3928    let header = reader.header();
3929    let msg_first_realtime_usec = metadata
3930        .and_then(|metadata| metadata.msg_first_realtime_usec)
3931        .unwrap_or(header.head_entry_realtime);
3932    let msg_last_realtime_usec = metadata
3933        .and_then(|metadata| metadata.msg_last_realtime_usec)
3934        .unwrap_or_else(|| {
3935            if header.tail_entry_realtime == 0 {
3936                file_last_modified_usec
3937            } else {
3938                header.tail_entry_realtime
3939            }
3940        });
3941    JournalFileOrderInfo {
3942        msg_first_realtime_usec,
3943        msg_last_realtime_usec,
3944        file_last_modified_usec,
3945        journal_vs_realtime_delta_usec,
3946    }
3947}
3948
3949fn compare_journal_file_order(
3950    left: &JournalFileOrderInfo,
3951    right: &JournalFileOrderInfo,
3952    direction: Direction,
3953) -> Ordering {
3954    let backward = right
3955        .msg_last_realtime_usec
3956        .cmp(&left.msg_last_realtime_usec)
3957        .then_with(|| {
3958            right
3959                .file_last_modified_usec
3960                .cmp(&left.file_last_modified_usec)
3961        })
3962        .then_with(|| {
3963            right
3964                .msg_first_realtime_usec
3965                .cmp(&left.msg_first_realtime_usec)
3966        });
3967    match direction {
3968        Direction::Backward => backward,
3969        Direction::Forward => backward.reverse(),
3970    }
3971}
3972
3973fn is_journal_file_name(path: &Path) -> bool {
3974    path.file_name()
3975        .and_then(|name| name.to_str())
3976        .is_some_and(|name| {
3977            name.ends_with(".journal")
3978                || name.ends_with(".journal~")
3979                || name.ends_with(".journal.zst")
3980                || name.ends_with(".journal~.zst")
3981        })
3982}
3983
3984fn push_unique_many(target: &mut Vec<String>, values: &[String]) {
3985    for value in values {
3986        push_unique(target, value);
3987    }
3988}
3989
3990fn string_fields(fields: &[Vec<u8>]) -> Vec<String> {
3991    fields
3992        .iter()
3993        .filter_map(|field| String::from_utf8(field.clone()).ok())
3994        .collect()
3995}
3996
3997fn push_unique(target: &mut Vec<String>, value: impl AsRef<str>) {
3998    let value = value.as_ref();
3999    if !target.iter().any(|existing| existing == value) {
4000        target.push(value.to_string());
4001    }
4002}
4003
4004fn netdata_reorder_key(value: &str) -> String {
4005    value
4006        .trim_start_matches(|character: char| character.is_ascii_punctuation())
4007        .to_ascii_lowercase()
4008}
4009
4010fn histogram_update_every_seconds(histogram: &ExplorerHistogram) -> u64 {
4011    histogram
4012        .buckets
4013        .first()
4014        .map(|bucket| {
4015            bucket
4016                .end_realtime_usec
4017                .saturating_sub(bucket.start_realtime_usec)
4018                .checked_div(1_000_000)
4019                .unwrap_or(1)
4020                .max(1)
4021        })
4022        .unwrap_or(1)
4023}
4024
4025enum TimestampPrecision {
4026    Seconds,
4027    Micros,
4028}
4029
4030fn format_realtime_usec(timestamp: u64, precision: TimestampPrecision) -> String {
4031    let seconds = (timestamp / 1_000_000) as i64;
4032    let micros = (timestamp % 1_000_000) as u32;
4033    DateTime::<Utc>::from_timestamp(seconds, micros.saturating_mul(1000))
4034        .map(|datetime| match precision {
4035            TimestampPrecision::Seconds => datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
4036            TimestampPrecision::Micros => datetime.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string(),
4037        })
4038        .unwrap_or_else(|| timestamp.to_string())
4039}
4040
4041fn priority_name(raw: &str) -> Option<&'static str> {
4042    match parse_priority(raw)? {
4043        0 => Some("panic"),
4044        1 => Some("alert"),
4045        2 => Some("critical"),
4046        3 => Some("error"),
4047        4 => Some("warning"),
4048        5 => Some("notice"),
4049        6 => Some("info"),
4050        7 => Some("debug"),
4051        _ => None,
4052    }
4053}
4054
4055fn priority_name_to_number(value: &str) -> Option<&'static str> {
4056    match value {
4057        "panic" | "emergency" | "emerg" => Some("0"),
4058        "alert" => Some("1"),
4059        "critical" | "crit" => Some("2"),
4060        "error" | "err" => Some("3"),
4061        "warning" | "warn" => Some("4"),
4062        "notice" => Some("5"),
4063        "info" => Some("6"),
4064        "debug" => Some("7"),
4065        _ => None,
4066    }
4067}
4068
4069fn parse_priority(raw: &str) -> Option<u8> {
4070    raw.parse::<u8>().ok()
4071}
4072
4073fn priority_to_row_severity(raw: &[u8]) -> &'static str {
4074    let raw = String::from_utf8_lossy(raw);
4075    match parse_priority(&raw) {
4076        Some(priority) if priority <= 3 => "critical",
4077        Some(4) => "warning",
4078        Some(5) => "notice",
4079        Some(priority) if priority >= 7 => "debug",
4080        _ => "normal",
4081    }
4082}
4083
4084fn syslog_facility_name(raw: &str) -> Option<&'static str> {
4085    match raw.parse::<u8>().ok()? {
4086        0 => Some("kern"),
4087        1 => Some("user"),
4088        2 => Some("mail"),
4089        3 => Some("daemon"),
4090        4 => Some("auth"),
4091        5 => Some("syslog"),
4092        6 => Some("lpr"),
4093        7 => Some("news"),
4094        8 => Some("uucp"),
4095        9 => Some("cron"),
4096        10 => Some("authpriv"),
4097        11 => Some("ftp"),
4098        16 => Some("local0"),
4099        17 => Some("local1"),
4100        18 => Some("local2"),
4101        19 => Some("local3"),
4102        20 => Some("local4"),
4103        21 => Some("local5"),
4104        22 => Some("local6"),
4105        23 => Some("local7"),
4106        _ => None,
4107    }
4108}
4109
4110const ERRNO_NAMES: &[(u32, &str)] = &[
4111    (1, "EPERM"),
4112    (2, "ENOENT"),
4113    (3, "ESRCH"),
4114    (4, "EINTR"),
4115    (5, "EIO"),
4116    (6, "ENXIO"),
4117    (7, "E2BIG"),
4118    (8, "ENOEXEC"),
4119    (9, "EBADF"),
4120    (10, "ECHILD"),
4121    (11, "EAGAIN"),
4122    (12, "ENOMEM"),
4123    (13, "EACCES"),
4124    (14, "EFAULT"),
4125    (15, "ENOTBLK"),
4126    (16, "EBUSY"),
4127    (17, "EEXIST"),
4128    (18, "EXDEV"),
4129    (19, "ENODEV"),
4130    (20, "ENOTDIR"),
4131    (21, "EISDIR"),
4132    (22, "EINVAL"),
4133    (23, "ENFILE"),
4134    (24, "EMFILE"),
4135    (25, "ENOTTY"),
4136    (26, "ETXTBSY"),
4137    (27, "EFBIG"),
4138    (28, "ENOSPC"),
4139    (29, "ESPIPE"),
4140    (30, "EROFS"),
4141    (31, "EMLINK"),
4142    (32, "EPIPE"),
4143    (33, "EDOM"),
4144    (34, "ERANGE"),
4145    (35, "EDEADLK"),
4146    (36, "ENAMETOOLONG"),
4147    (37, "ENOLCK"),
4148    (38, "ENOSYS"),
4149    (39, "ENOTEMPTY"),
4150    (40, "ELOOP"),
4151    (42, "ENOMSG"),
4152    (43, "EIDRM"),
4153    (44, "ECHRNG"),
4154    (45, "EL2NSYNC"),
4155    (46, "EL3HLT"),
4156    (47, "EL3RST"),
4157    (48, "ELNRNG"),
4158    (49, "EUNATCH"),
4159    (50, "ENOCSI"),
4160    (51, "EL2HLT"),
4161    (52, "EBADE"),
4162    (53, "EBADR"),
4163    (54, "EXFULL"),
4164    (55, "ENOANO"),
4165    (56, "EBADRQC"),
4166    (57, "EBADSLT"),
4167    (59, "EBFONT"),
4168    (60, "ENOSTR"),
4169    (61, "ENODATA"),
4170    (62, "ETIME"),
4171    (63, "ENOSR"),
4172    (64, "ENONET"),
4173    (65, "ENOPKG"),
4174    (66, "EREMOTE"),
4175    (67, "ENOLINK"),
4176    (68, "EADV"),
4177    (69, "ESRMNT"),
4178    (70, "ECOMM"),
4179    (71, "EPROTO"),
4180    (72, "EMULTIHOP"),
4181    (73, "EDOTDOT"),
4182    (74, "EBADMSG"),
4183    (75, "EOVERFLOW"),
4184    (76, "ENOTUNIQ"),
4185    (77, "EBADFD"),
4186    (78, "EREMCHG"),
4187    (79, "ELIBACC"),
4188    (80, "ELIBBAD"),
4189    (81, "ELIBSCN"),
4190    (82, "ELIBMAX"),
4191    (83, "ELIBEXEC"),
4192    (84, "EILSEQ"),
4193    (85, "ERESTART"),
4194    (86, "ESTRPIPE"),
4195    (87, "EUSERS"),
4196    (88, "ENOTSOCK"),
4197    (89, "EDESTADDRREQ"),
4198    (90, "EMSGSIZE"),
4199    (91, "EPROTOTYPE"),
4200    (92, "ENOPROTOOPT"),
4201    (93, "EPROTONOSUPPORT"),
4202    (94, "ESOCKTNOSUPPORT"),
4203    (95, "ENOTSUP"),
4204    (96, "EPFNOSUPPORT"),
4205    (97, "EAFNOSUPPORT"),
4206    (98, "EADDRINUSE"),
4207    (99, "EADDRNOTAVAIL"),
4208    (100, "ENETDOWN"),
4209    (101, "ENETUNREACH"),
4210    (102, "ENETRESET"),
4211    (103, "ECONNABORTED"),
4212    (104, "ECONNRESET"),
4213    (105, "ENOBUFS"),
4214    (106, "EISCONN"),
4215    (107, "ENOTCONN"),
4216    (108, "ESHUTDOWN"),
4217    (109, "ETOOMANYREFS"),
4218    (110, "ETIMEDOUT"),
4219    (111, "ECONNREFUSED"),
4220    (112, "EHOSTDOWN"),
4221    (113, "EHOSTUNREACH"),
4222    (114, "EALREADY"),
4223    (115, "EINPROGRESS"),
4224    (116, "ESTALE"),
4225    (117, "EUCLEAN"),
4226    (118, "ENOTNAM"),
4227    (119, "ENAVAIL"),
4228    (120, "EISNAM"),
4229    (121, "EREMOTEIO"),
4230    (122, "EDQUOT"),
4231    (123, "ENOMEDIUM"),
4232    (124, "EMEDIUMTYPE"),
4233    (125, "ECANCELED"),
4234    (126, "ENOKEY"),
4235    (127, "EKEYEXPIRED"),
4236    (128, "EKEYREVOKED"),
4237    (129, "EKEYREJECTED"),
4238    (130, "EOWNERDEAD"),
4239    (131, "ENOTRECOVERABLE"),
4240    (132, "ERFKILL"),
4241    (133, "EHWPOISON"),
4242];
4243
4244fn errno_name(raw: &str) -> Option<String> {
4245    let errno = raw.parse::<u32>().ok()?;
4246    let name = ERRNO_NAMES
4247        .iter()
4248        .find_map(|(candidate, name)| (*candidate == errno).then_some(*name))?;
4249    Some(format!("{errno} ({name})"))
4250}
4251
4252fn cap_effective_display(raw: &str) -> String {
4253    if !raw.bytes().next().is_some_and(|byte| byte.is_ascii_digit()) {
4254        return raw.to_string();
4255    }
4256    let Ok(value) = u64::from_str_radix(raw, 16) else {
4257        return raw.to_string();
4258    };
4259    if value == 0 {
4260        return raw.to_string();
4261    }
4262    const CAPABILITIES: &[&str] = &[
4263        "CHOWN",
4264        "DAC_OVERRIDE",
4265        "DAC_READ_SEARCH",
4266        "FOWNER",
4267        "FSETID",
4268        "KILL",
4269        "SETGID",
4270        "SETUID",
4271        "SETPCAP",
4272        "LINUX_IMMUTABLE",
4273        "NET_BIND_SERVICE",
4274        "NET_BROADCAST",
4275        "NET_ADMIN",
4276        "NET_RAW",
4277        "IPC_LOCK",
4278        "IPC_OWNER",
4279        "SYS_MODULE",
4280        "SYS_RAWIO",
4281        "SYS_CHROOT",
4282        "SYS_PTRACE",
4283        "SYS_PACCT",
4284        "SYS_ADMIN",
4285        "SYS_BOOT",
4286        "SYS_NICE",
4287        "SYS_RESOURCE",
4288        "SYS_TIME",
4289        "SYS_TTY_CONFIG",
4290        "MKNOD",
4291        "LEASE",
4292        "AUDIT_WRITE",
4293        "AUDIT_CONTROL",
4294        "SETFCAP",
4295        "MAC_OVERRIDE",
4296        "MAC_ADMIN",
4297        "SYSLOG",
4298        "WAKE_ALARM",
4299        "BLOCK_SUSPEND",
4300        "AUDIT_READ",
4301        "PERFMON",
4302        "BPF",
4303        "CHECKPOINT_RESTORE",
4304    ];
4305    let names: Vec<&str> = CAPABILITIES
4306        .iter()
4307        .enumerate()
4308        .filter_map(|(index, name)| ((value & (1u64 << index)) != 0).then_some(*name))
4309        .collect();
4310    if names.is_empty() {
4311        raw.to_string()
4312    } else {
4313        format!("{raw} ({})", names.join(" | "))
4314    }
4315}
4316
4317fn systemd_field_display_value(
4318    context: &DisplayContext,
4319    scope: DisplayScope,
4320    field: &str,
4321    value: &[u8],
4322    resolve_user_group_names: bool,
4323) -> Value {
4324    let raw = String::from_utf8_lossy(value);
4325    match field {
4326        "PRIORITY" => Value::String(priority_name(&raw).unwrap_or(&raw).to_string()),
4327        "SYSLOG_FACILITY" => Value::String(syslog_facility_name(&raw).unwrap_or(&raw).to_string()),
4328        "ERRNO" => Value::String(errno_name(&raw).unwrap_or_else(|| raw.to_string())),
4329        "MESSAGE_ID" => Value::String(match (message_id_name(&raw), scope) {
4330            (Some(name), DisplayScope::Data) => format!("{raw} ({name})"),
4331            (Some(name), DisplayScope::Facet | DisplayScope::Histogram) => name.to_string(),
4332            (None, _) => raw.into_owned(),
4333        }),
4334        "_BOOT_ID" => Value::String(match (context.boot_first_realtime.get(value), scope) {
4335            (Some(timestamp), DisplayScope::Data) => format!(
4336                "{} ({})  ",
4337                raw,
4338                format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4339            ),
4340            (Some(timestamp), DisplayScope::Facet | DisplayScope::Histogram) => {
4341                format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4342            }
4343            (None, _) => raw.into_owned(),
4344        }),
4345        "_UID"
4346        | "_SYSTEMD_OWNER_UID"
4347        | "OBJECT_SYSTEMD_OWNER_UID"
4348        | "OBJECT_UID"
4349        | "_AUDIT_LOGINUID"
4350        | "OBJECT_AUDIT_LOGINUID" => {
4351            if resolve_user_group_names {
4352                Value::String(cached_uid_display(context, raw.as_ref()))
4353            } else {
4354                Value::String(raw.into_owned())
4355            }
4356        }
4357        "_GID" | "OBJECT_GID" => {
4358            if resolve_user_group_names {
4359                Value::String(cached_gid_display(context, raw.as_ref()))
4360            } else {
4361                Value::String(raw.into_owned())
4362            }
4363        }
4364        "_CAP_EFFECTIVE" => Value::String(cap_effective_display(&raw)),
4365        "_SOURCE_REALTIME_TIMESTAMP" => Value::String(match raw.parse::<u64>() {
4366            Ok(timestamp) if timestamp != 0 => {
4367                format!(
4368                    "{} ({})",
4369                    raw,
4370                    format_realtime_usec(timestamp, TimestampPrecision::Micros)
4371                )
4372            }
4373            _ => raw.into_owned(),
4374        }),
4375        _ => Value::String(raw.into_owned()),
4376    }
4377}
4378
4379fn cached_uid_display(context: &DisplayContext, raw: &str) -> String {
4380    if let Some(value) = context.uid_display_cache.borrow().get(raw) {
4381        return value.clone();
4382    }
4383    let value = resolve_uid_name(raw).unwrap_or_else(|| raw.to_string());
4384    context
4385        .uid_display_cache
4386        .borrow_mut()
4387        .insert(raw.to_string(), value.clone());
4388    value
4389}
4390
4391fn cached_gid_display(context: &DisplayContext, raw: &str) -> String {
4392    if let Some(value) = context.gid_display_cache.borrow().get(raw) {
4393        return value.clone();
4394    }
4395    let value = resolve_gid_name(raw).unwrap_or_else(|| raw.to_string());
4396    context
4397        .gid_display_cache
4398        .borrow_mut()
4399        .insert(raw.to_string(), value.clone());
4400    value
4401}
4402
4403#[cfg(unix)]
4404fn resolve_uid_name(raw: &str) -> Option<String> {
4405    let uid = raw.parse::<libc::uid_t>().ok()?;
4406    let mut pwd = std::mem::MaybeUninit::<libc::passwd>::uninit();
4407    let mut result = std::ptr::null_mut();
4408    let mut buffer = vec![0i8; 16_384];
4409    let rc = unsafe {
4410        libc::getpwuid_r(
4411            uid,
4412            pwd.as_mut_ptr(),
4413            buffer.as_mut_ptr(),
4414            buffer.len(),
4415            &mut result,
4416        )
4417    };
4418    if rc != 0 || result.is_null() {
4419        return None;
4420    }
4421    let pwd = unsafe { pwd.assume_init() };
4422    Some(
4423        unsafe { CStr::from_ptr(pwd.pw_name) }
4424            .to_string_lossy()
4425            .into_owned(),
4426    )
4427}
4428
4429#[cfg(not(unix))]
4430fn resolve_uid_name(_raw: &str) -> Option<String> {
4431    None
4432}
4433
4434#[cfg(unix)]
4435fn resolve_gid_name(raw: &str) -> Option<String> {
4436    let gid = raw.parse::<libc::gid_t>().ok()?;
4437    let mut grp = std::mem::MaybeUninit::<libc::group>::uninit();
4438    let mut result = std::ptr::null_mut();
4439    let mut buffer = vec![0i8; 16_384];
4440    let rc = unsafe {
4441        libc::getgrgid_r(
4442            gid,
4443            grp.as_mut_ptr(),
4444            buffer.as_mut_ptr(),
4445            buffer.len(),
4446            &mut result,
4447        )
4448    };
4449    if rc != 0 || result.is_null() {
4450        return None;
4451    }
4452    let grp = unsafe { grp.assume_init() };
4453    Some(
4454        unsafe { CStr::from_ptr(grp.gr_name) }
4455            .to_string_lossy()
4456            .into_owned(),
4457    )
4458}
4459
4460#[cfg(not(unix))]
4461fn resolve_gid_name(_raw: &str) -> Option<String> {
4462    None
4463}
4464
4465const MESSAGE_ID_NAMES: &[(&str, &str)] = &[
4466    ("f77379a8490b408bbe5f6940505a777b", "Journal started"),
4467    ("d93fb3c9c24d451a97cea615ce59c00b", "Journal stopped"),
4468    (
4469        "a596d6fe7bfa4994828e72309e95d61e",
4470        "Journal messages suppressed",
4471    ),
4472    (
4473        "e9bf28e6e834481bb6f48f548ad13606",
4474        "Journal messages missed",
4475    ),
4476    (
4477        "ec387f577b844b8fa948f33cad9a75e6",
4478        "Journal disk space usage",
4479    ),
4480    ("fc2e22bc6ee647b6b90729ab34a250b1", "Coredump"),
4481    ("5aadd8e954dc4b1a8c954d63fd9e1137", "Coredump truncated"),
4482    ("1f4e0a44a88649939aaea34fc6da8c95", "Backtrace"),
4483    ("8d45620c1a4348dbb17410da57c60c66", "User Session created"),
4484    (
4485        "3354939424b4456d9802ca8333ed424a",
4486        "User Session terminated",
4487    ),
4488    ("fcbefc5da23d428093f97c82a9290f7b", "Seat started"),
4489    ("e7852bfe46784ed0accde04bc864c2d5", "Seat removed"),
4490    (
4491        "24d8d4452573402496068381a6312df2",
4492        "VM or container started",
4493    ),
4494    (
4495        "58432bd3bace477cb514b56381b8a758",
4496        "VM or container stopped",
4497    ),
4498    ("c7a787079b354eaaa9e77b371893cd27", "Time change"),
4499    ("45f82f4aef7a4bbf942ce861d1f20990", "Timezone change"),
4500    (
4501        "50876a9db00f4c40bde1a2ad381c3a1b",
4502        "System configuration issues",
4503    ),
4504    (
4505        "b07a249cd024414a82dd00cd181378ff",
4506        "System start-up completed",
4507    ),
4508    (
4509        "eed00a68ffd84e31882105fd973abdd1",
4510        "User start-up completed",
4511    ),
4512    ("6bbd95ee977941e497c48be27c254128", "Sleep start"),
4513    ("8811e6df2a8e40f58a94cea26f8ebf14", "Sleep stop"),
4514    (
4515        "98268866d1d54a499c4e98921d93bc40",
4516        "System shutdown initiated",
4517    ),
4518    (
4519        "c14aaf76ec284a5fa1f105f88dfb061c",
4520        "System factory reset initiated",
4521    ),
4522    ("d9ec5e95e4b646aaaea2fd05214edbda", "Container init crashed"),
4523    (
4524        "3ed0163e868a4417ab8b9e210407a96c",
4525        "System reboot failed after crash",
4526    ),
4527    ("645c735537634ae0a32b15a7c6cba7d4", "Init execution froze"),
4528    (
4529        "5addb3a06a734d3396b794bf98fb2d01",
4530        "Init crashed no coredump",
4531    ),
4532    ("5c9e98de4ab94c6a9d04d0ad793bd903", "Init crashed no fork"),
4533    (
4534        "5e6f1f5e4db64a0eaee3368249d20b94",
4535        "Init crashed unknown signal",
4536    ),
4537    (
4538        "83f84b35ee264f74a3896a9717af34cb",
4539        "Init crashed systemd signal",
4540    ),
4541    (
4542        "3a73a98baf5b4b199929e3226c0be783",
4543        "Init crashed process signal",
4544    ),
4545    (
4546        "2ed18d4f78ca47f0a9bc25271c26adb4",
4547        "Init crashed waitpid failed",
4548    ),
4549    (
4550        "56b1cd96f24246c5b607666fda952356",
4551        "Init crashed coredump failed",
4552    ),
4553    ("4ac7566d4d7548f4981f629a28f0f829", "Init crashed coredump"),
4554    (
4555        "38e8b1e039ad469291b18b44c553a5b7",
4556        "Crash shell failed to fork",
4557    ),
4558    (
4559        "872729b47dbe473eb768ccecd477beda",
4560        "Crash shell failed to execute",
4561    ),
4562    ("658a67adc1c940b3b3316e7e8628834a", "Selinux failed"),
4563    ("e6f456bd92004d9580160b2207555186", "Battery low warning"),
4564    (
4565        "267437d33fdd41099ad76221cc24a335",
4566        "Battery low powering off",
4567    ),
4568    (
4569        "79e05b67bc4545d1922fe47107ee60c5",
4570        "Manager mainloop failed",
4571    ),
4572    ("dbb136b10ef4457ba47a795d62f108c9", "Manager no xdgdir path"),
4573    (
4574        "ed158c2df8884fa584eead2d902c1032",
4575        "Init failed to drop capability bounding set of usermode",
4576    ),
4577    (
4578        "42695b500df048298bee37159caa9f2e",
4579        "Init failed to drop capability bounding set",
4580    ),
4581    (
4582        "bfc2430724ab44499735b4f94cca9295",
4583        "User manager can't disable new privileges",
4584    ),
4585    (
4586        "59288af523be43a28d494e41e26e4510",
4587        "Manager failed to start default target",
4588    ),
4589    (
4590        "689b4fcc97b4486ea5da92db69c9e314",
4591        "Manager failed to isolate default target",
4592    ),
4593    (
4594        "5ed836f1766f4a8a9fc5da45aae23b29",
4595        "Manager failed to collect passed file descriptors",
4596    ),
4597    (
4598        "6a40fbfbd2ba4b8db02fb40c9cd090d7",
4599        "Init failed to fix up environment variables",
4600    ),
4601    (
4602        "0e54470984ac419689743d957a119e2e",
4603        "Manager failed to allocate",
4604    ),
4605    (
4606        "d67fa9f847aa4b048a2ae33535331adb",
4607        "Manager failed to write Smack",
4608    ),
4609    (
4610        "af55a6f75b544431b72649f36ff6d62c",
4611        "System shutdown critical error",
4612    ),
4613    (
4614        "d18e0339efb24a068d9c1060221048c2",
4615        "Init failed to fork off valgrind",
4616    ),
4617    ("7d4958e842da4a758f6c1cdc7b36dcc5", "Unit starting"),
4618    ("39f53479d3a045ac8e11786248231fbf", "Unit started"),
4619    ("be02cf6855d2428ba40df7e9d022f03d", "Unit failed"),
4620    ("de5b426a63be47a7b6ac3eaac82e2f6f", "Unit stopping"),
4621    ("9d1aaa27d60140bd96365438aad20286", "Unit stopped"),
4622    ("d34d037fff1847e6ae669a370e694725", "Unit reloading"),
4623    ("7b05ebc668384222baa8881179cfda54", "Unit reloaded"),
4624    ("5eb03494b6584870a536b337290809b3", "Unit restart scheduled"),
4625    ("ae8f7b866b0347b9af31fe1c80b127c0", "Unit resources"),
4626    ("7ad2d189f7e94e70a38c781354912448", "Unit success"),
4627    ("0e4284a0caca4bfc81c0bb6786972673", "Unit skipped"),
4628    ("d9b373ed55a64feb8242e02dbe79a49c", "Unit failure result"),
4629    (
4630        "641257651c1b4ec9a8624d7a40a9e1e7",
4631        "Process execution failed",
4632    ),
4633    ("98e322203f7a4ed290d09fe03c09fe15", "Unit process exited"),
4634    ("0027229ca0644181a76c4e92458afa2e", "Syslog forward missed"),
4635    (
4636        "1dee0369c7fc4736b7099b38ecb46ee7",
4637        "Mount point is not empty",
4638    ),
4639    ("d989611b15e44c9dbf31e3c81256e4ed", "Unit oomd kill"),
4640    ("fe6faa94e7774663a0da52717891d8ef", "Unit out of memory"),
4641    ("b72ea4a2881545a0b50e200e55b9b06f", "Lid opened"),
4642    ("b72ea4a2881545a0b50e200e55b9b070", "Lid closed"),
4643    ("f5f416b862074b28927a48c3ba7d51ff", "System docked"),
4644    ("51e171bd585248568110144c517cca53", "System undocked"),
4645    ("b72ea4a2881545a0b50e200e55b9b071", "Power key"),
4646    ("3e0117101eb243c1b9a50db3494ab10b", "Power key long press"),
4647    ("9fa9d2c012134ec385451ffe316f97d0", "Reboot key"),
4648    ("f1c59a58c9d943668965c337caec5975", "Reboot key long press"),
4649    ("b72ea4a2881545a0b50e200e55b9b072", "Suspend key"),
4650    ("bfdaf6d312ab4007bc1fe40a15df78e8", "Suspend key long press"),
4651    ("b72ea4a2881545a0b50e200e55b9b073", "Hibernate key"),
4652    (
4653        "167836df6f7f428e98147227b2dc8945",
4654        "Hibernate key long press",
4655    ),
4656    ("c772d24e9a884cbeb9ea12625c306c01", "Invalid configuration"),
4657    (
4658        "1675d7f172174098b1108bf8c7dc8f5d",
4659        "DNSSEC validation failed",
4660    ),
4661    (
4662        "4d4408cfd0d144859184d1e65d7c8a65",
4663        "DNSSEC trust anchor revoked",
4664    ),
4665    ("36db2dfa5a9045e1bd4af5f93e1cf057", "DNSSEC turned off"),
4666    ("b61fdac612e94b9182285b998843061f", "Username unsafe"),
4667    (
4668        "1b3bb94037f04bbf81028e135a12d293",
4669        "Mount point path not suitable",
4670    ),
4671    (
4672        "010190138f494e29a0ef6669749531aa",
4673        "Device path not suitable",
4674    ),
4675    ("b480325f9c394a7b802c231e51a2752c", "Nobody user unsuitable"),
4676    (
4677        "1c0454c1bd2241e0ac6fefb4bc631433",
4678        "Systemd udev settle deprecated",
4679    ),
4680    ("7c8a41f37b764941a0e1780b1be2f037", "Time initial sync"),
4681    ("7db73c8af0d94eeb822ae04323fe6ab6", "Time initial bump"),
4682    ("9e7066279dc8403da79ce4b1a69064b2", "Shutdown scheduled"),
4683    ("249f6fb9e6e2428c96f3f0875681ffa3", "Shutdown canceled"),
4684    ("3f7d5ef3e54f4302b4f0b143bb270cab", "TPM PCR Extended"),
4685    ("f9b0be465ad540d0850ad32172d57c21", "Memory Trimmed"),
4686    ("a8fa8dacdb1d443e9503b8be367a6adb", "SysV Service Found"),
4687    (
4688        "187c62eb1e7f463bb530394f52cb090f",
4689        "Portable Service attached",
4690    ),
4691    (
4692        "76c5c754d628490d8ecba4c9d042112b",
4693        "Portable Service detached",
4694    ),
4695    (
4696        "9cf56b8baf9546cf9478783a8de42113",
4697        "systemd-networkd sysctl changed by foreign process",
4698    ),
4699    (
4700        "ad7089f928ac4f7ea00c07457d47ba8a",
4701        "SRK into TPM authorization failure",
4702    ),
4703    (
4704        "b2bcbaf5edf948e093ce50bbea0e81ec",
4705        "Secure Attention Key (SAK) was pressed",
4706    ),
4707    ("7fc63312330b479bb32e598d47cef1a8", "dbus activate no unit"),
4708    (
4709        "ee9799dab1e24d81b7bee7759a543e1b",
4710        "dbus activate masked unit",
4711    ),
4712    ("a0fa58cafd6f4f0c8d003d16ccf9e797", "dbus broker exited"),
4713    ("c8c6cde1c488439aba371a664353d9d8", "dbus dirwatch"),
4714    ("8af3357071af4153af414daae07d38e7", "dbus dispatch stats"),
4715    ("199d4300277f495f84ba4028c984214c", "dbus no sopeergroup"),
4716    (
4717        "b209c0d9d1764ab38d13b8e00d1784d6",
4718        "dbus protocol violation",
4719    ),
4720    ("6fa70fa776044fa28be7a21daf42a108", "dbus receive failed"),
4721    (
4722        "0ce0fa61d1a9433dabd67417f6b8e535",
4723        "dbus service failed open",
4724    ),
4725    ("24dc708d9e6a4226a3efe2033bb744de", "dbus service invalid"),
4726    ("f15d2347662d483ea9bcd8aa1a691d28", "dbus sighup"),
4727    (
4728        "0ce153587afa4095832d233c17a88001",
4729        "Gnome SM startup succeeded",
4730    ),
4731    (
4732        "10dd2dc188b54a5e98970f56499d1f73",
4733        "Gnome SM unrecoverable failure",
4734    ),
4735    ("f3ea493c22934e26811cd62abe8e203a", "Gnome shell started"),
4736    ("c7b39b1e006b464599465e105b361485", "Flatpak cache"),
4737    ("75ba3deb0af041a9a46272ff85d9e73e", "Flathub pulls"),
4738    ("f02bce89a54e4efab3a94a797d26204a", "Flathub pull errors"),
4739    ("dd11929c788e48bdbb6276fb5f26b08a", "Boltd starting"),
4740    ("1e6061a9fbd44501b3ccc368119f2b69", "Netdata startup"),
4741    (
4742        "ed4cdb8f1beb4ad3b57cb3cae2d162fa",
4743        "Netdata connection from child",
4744    ),
4745    (
4746        "6e2e3839067648968b646045dbf28d66",
4747        "Netdata connection to parent",
4748    ),
4749    (
4750        "9ce0cb58ab8b44df82c4bf1ad9ee22de",
4751        "Netdata alert transition",
4752    ),
4753    (
4754        "6db0018e83e34320ae2a659d78019fb7",
4755        "Netdata alert notification",
4756    ),
4757    ("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"),
4758    (
4759        "8ddaf5ba33a74078b609250db1e951f3",
4760        "Sensor state transition",
4761    ),
4762    (
4763        "ec87a56120d5431bace51e2fb8bba243",
4764        "Netdata log flood protection",
4765    ),
4766    (
4767        "acb33cb95778476baac702eb7e4e151d",
4768        "Netdata Cloud connection",
4769    ),
4770    (
4771        "d1f59606dd4d41e3b217a0cfcae8e632",
4772        "Netdata extreme cardinality",
4773    ),
4774    ("02f47d350af5449197bf7a95b605a468", "Netdata exit reason"),
4775    (
4776        "4fdf40816c124623a032b7fe73beacb8",
4777        "Netdata dynamic configuration",
4778    ),
4779];
4780
4781fn message_id_name(raw: &str) -> Option<&'static str> {
4782    MESSAGE_ID_NAMES
4783        .iter()
4784        .find_map(|(candidate, name)| (*candidate == raw).then_some(*name))
4785}
4786
4787#[cfg(test)]
4788mod tests {
4789    use super::*;
4790    use crate::ExplorerHistogramBucket;
4791    use journal_core::file::{JournalFile, JournalFileOptions, JournalWriter, MmapMut};
4792    use journal_core::repository::File as RepoFile;
4793    use std::cell::Cell;
4794    use std::collections::HashMap;
4795    use tempfile::TempDir;
4796
4797    #[derive(Default)]
4798    struct TestNetdataState {
4799        metadata: HashMap<PathBuf, NetdataJournalFileMetadata>,
4800        updates: Vec<(PathBuf, u64)>,
4801    }
4802
4803    impl NetdataFunctionState for TestNetdataState {
4804        fn file_metadata(&self, path: &Path) -> Option<NetdataJournalFileMetadata> {
4805            self.metadata.get(path).cloned()
4806        }
4807
4808        fn update_file_journal_vs_realtime_delta_usec(&mut self, path: &Path, delta_usec: u64) {
4809            self.updates.push((path.to_path_buf(), delta_usec));
4810        }
4811    }
4812
4813    fn test_uuid(seed: u8) -> uuid::Uuid {
4814        uuid::Uuid::from_bytes([seed; 16])
4815    }
4816
4817    fn write_netdata_test_journal(directory: &std::path::Path, count: usize) {
4818        write_named_netdata_test_journal(
4819            directory,
4820            "netdata-api-test.journal",
4821            count,
4822            1_700_000_000_000_000,
4823        );
4824    }
4825
4826    fn write_named_netdata_test_journal(
4827        directory: &std::path::Path,
4828        name: &str,
4829        count: usize,
4830        start_realtime_usec: u64,
4831    ) {
4832        write_stepped_netdata_test_journal(directory, name, count, start_realtime_usec, 1);
4833    }
4834
4835    fn write_stepped_netdata_test_journal(
4836        directory: &std::path::Path,
4837        name: &str,
4838        count: usize,
4839        start_realtime_usec: u64,
4840        step_realtime_usec: u64,
4841    ) {
4842        std::fs::create_dir_all(directory).expect("create journal dir");
4843        let path = directory.join(name);
4844        let repo_file = RepoFile::from_path(&path).expect("repo file");
4845        let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
4846        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
4847        let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
4848        for index in 0..count {
4849            let message = format!("MESSAGE=row-{index}");
4850            let service = if index % 2 == 0 {
4851                b"SERVICE=even".as_slice()
4852            } else {
4853                b"SERVICE=odd".as_slice()
4854            };
4855            let payloads: [&[u8]; 3] = [message.as_bytes(), service, b"PRIORITY=6"];
4856            let realtime = start_realtime_usec
4857                .saturating_add((index as u64).saturating_mul(step_realtime_usec));
4858            writer
4859                .add_entry(&mut file, &payloads, realtime, realtime)
4860                .expect("write entry");
4861        }
4862        file.sync().expect("sync journal");
4863    }
4864
4865    struct NetdataCollectedPages {
4866        messages: Vec<String>,
4867        timestamps: Vec<u64>,
4868    }
4869
4870    fn collect_netdata_pages(
4871        directory: &std::path::Path,
4872        direction: Direction,
4873        page_size: usize,
4874    ) -> NetdataCollectedPages {
4875        let mut out = NetdataCollectedPages {
4876            messages: Vec::new(),
4877            timestamps: Vec::new(),
4878        };
4879        let mut anchor = None;
4880        for page in 0..100 {
4881            let mut request = json!({
4882                "after": 1_700_000_000,
4883                "before": 1_800_000_000,
4884                "last": page_size,
4885                "direction": direction_name(direction),
4886                "data_only": true,
4887            });
4888            if let Some(anchor) = anchor {
4889                request["anchor"] = Value::from(anchor);
4890            }
4891            let response = run_netdata_contract_request(directory, request);
4892            assert_eq!(
4893                response["status"], 200,
4894                "page {page} response = {response:#}"
4895            );
4896            let messages = response_column_strings(&response, "MESSAGE");
4897            let timestamps = response_column_u64s(&response, "timestamp");
4898            assert_eq!(messages.len(), timestamps.len());
4899            if messages.is_empty() {
4900                break;
4901            }
4902            out.messages.extend(messages);
4903            out.timestamps.extend(timestamps.iter().copied());
4904            anchor = Some(match direction {
4905                Direction::Forward => timestamps[0],
4906                Direction::Backward => *timestamps.last().expect("page timestamp"),
4907            });
4908            if timestamps.len() < page_size {
4909                break;
4910            }
4911        }
4912        assert!(!out.messages.is_empty(), "pagination returned no rows");
4913        out
4914    }
4915
4916    fn run_netdata_contract_request(directory: &std::path::Path, request: Value) -> Value {
4917        NetdataJournalFunction::systemd_journal_plugin_compatible()
4918            .run_directory_request_json_with_options(
4919                directory,
4920                &request,
4921                NetdataFunctionRunOptions::from_timeout_seconds(0),
4922            )
4923            .expect("run function")
4924    }
4925
4926    fn direction_name(direction: Direction) -> &'static str {
4927        match direction {
4928            Direction::Forward => "forward",
4929            Direction::Backward => "backward",
4930        }
4931    }
4932
4933    fn response_column_strings(response: &Value, field: &str) -> Vec<String> {
4934        let index = response_column_index(response, field);
4935        response["data"]
4936            .as_array()
4937            .expect("data")
4938            .iter()
4939            .map(|row| {
4940                row.as_array().expect("row")[index]
4941                    .as_str()
4942                    .expect("string cell")
4943                    .to_string()
4944            })
4945            .collect()
4946    }
4947
4948    fn response_column_u64s(response: &Value, field: &str) -> Vec<u64> {
4949        let index = response_column_index(response, field);
4950        response["data"]
4951            .as_array()
4952            .expect("data")
4953            .iter()
4954            .map(|row| {
4955                row.as_array().expect("row")[index]
4956                    .as_u64()
4957                    .expect("u64 cell")
4958            })
4959            .collect()
4960    }
4961
4962    fn response_column_index(response: &Value, field: &str) -> usize {
4963        response["columns"][field]["index"]
4964            .as_u64()
4965            .expect("column index") as usize
4966    }
4967
4968    fn response_facet_count(response: &Value, key: &str, field: &str, value: &str) -> u64 {
4969        for facet in response[key].as_array().expect("facets") {
4970            if facet["id"] != field {
4971                continue;
4972            }
4973            for option in facet["options"].as_array().expect("facet options") {
4974                if option["id"] == value {
4975                    return option["count"].as_u64().expect("facet count");
4976                }
4977            }
4978            panic!("{key} facet {field} missing value {value}");
4979        }
4980        panic!("{key} missing facet {field}");
4981    }
4982
4983    fn response_histogram_total(response: &Value, key: &str, value: &str) -> u64 {
4984        let chart = &response[key]["chart"];
4985        let names = chart["view"]["dimensions"]["names"]
4986            .as_array()
4987            .expect("histogram names");
4988        let dimension_index = names
4989            .iter()
4990            .position(|name| name.as_str() == Some(value))
4991            .expect("histogram dimension");
4992        chart["result"]["data"]
4993            .as_array()
4994            .expect("histogram data")
4995            .iter()
4996            .map(|point| {
4997                point.as_array().expect("histogram point")[dimension_index + 1]
4998                    .as_array()
4999                    .expect("histogram cell")[0]
5000                    .as_u64()
5001                    .unwrap_or_default()
5002            })
5003            .sum()
5004    }
5005
5006    fn assert_unique_messages(messages: &[String]) {
5007        let mut seen = HashSet::new();
5008        for message in messages {
5009            assert!(
5010                seen.insert(message),
5011                "duplicate message {message} in {messages:?}"
5012            );
5013        }
5014    }
5015
5016    #[test]
5017    fn parses_netdata_selections_as_and_fields_or_values() {
5018        let request = json!({
5019            "after": 200_000_000,
5020            "before": 200_000_100,
5021            "direction": "forward",
5022            "last": 25,
5023            "facets": ["PRIORITY"],
5024            "selections": {
5025                "PRIORITY": ["warning", "error"],
5026                "_HOSTNAME": ["node-a"],
5027                "__logs_sources": ["all-local-system-logs"],
5028            }
5029        });
5030
5031        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5032            .expect("parse request");
5033        assert_eq!(parsed.after_realtime_usec, Some(200_000_000_000_000));
5034        assert_eq!(parsed.before_realtime_usec, Some(200_000_100_999_999));
5035        assert_eq!(parsed.direction, Direction::Forward);
5036        assert_eq!(parsed.limit, 25);
5037        assert_eq!(parsed.filters.len(), 2);
5038        assert_eq!(parsed.filters[0].field, b"PRIORITY");
5039        assert_eq!(parsed.filters[0].values, vec![b"4".to_vec(), b"3".to_vec()]);
5040        assert_eq!(parsed.filters[1].field, b"_HOSTNAME");
5041        assert_eq!(parsed.filters[1].values, vec![b"node-a".to_vec()]);
5042    }
5043
5044    #[test]
5045    fn netdata_last_one_keeps_echo_and_uses_effective_minimum_two() {
5046        let request = json!({
5047            "after": 200_000_000,
5048            "before": 200_000_100,
5049            "last": 1
5050        });
5051
5052        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5053            .expect("parse request");
5054        let query =
5055            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5056
5057        assert_eq!(parsed.echo.get("last").and_then(Value::as_u64), Some(1));
5058        assert_eq!(parsed.limit, 2);
5059        assert_eq!(query.limit, 2);
5060    }
5061
5062    #[test]
5063    fn netdata_facet_counts_use_native_sliced_filter_semantics() {
5064        let request = json!({
5065            "after": 200_000_000,
5066            "before": 200_000_100,
5067            "facets": ["PRIORITY"],
5068            "selections": {
5069                "PRIORITY": ["warning"]
5070            }
5071        });
5072
5073        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5074            .expect("parse request");
5075        let query =
5076            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5077
5078        assert!(!query.exclude_facet_field_filters);
5079    }
5080
5081    #[test]
5082    fn netdata_multi_filter_facet_counts_exclude_same_field_filter() {
5083        let request = json!({
5084            "after": 200_000_000,
5085            "before": 200_000_100,
5086            "facets": ["PRIORITY", "_BOOT_ID"],
5087            "selections": {
5088                "PRIORITY": ["warning"],
5089                "_BOOT_ID": ["738043aea7b3417cbc3e9941ad26f769"]
5090            }
5091        });
5092
5093        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5094            .expect("parse request");
5095        let query =
5096            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5097
5098        assert!(query.exclude_facet_field_filters);
5099    }
5100
5101    #[test]
5102    fn parses_netdata_fts_query_like_simple_pattern() {
5103        let (terms, positives, negatives) =
5104            parse_fts_query_patterns(r"error|warning|!debug|escaped\|pipe|\!literal| a*B");
5105
5106        assert_eq!(
5107            positives,
5108            vec![
5109                b"error".to_vec(),
5110                b"warning".to_vec(),
5111                b"escaped|pipe".to_vec(),
5112                b"!literal".to_vec(),
5113                b" a*B".to_vec(),
5114            ]
5115        );
5116        assert_eq!(negatives, vec![b"debug".to_vec()]);
5117        assert_eq!(terms.len(), 6);
5118        assert!(!terms[0].negative);
5119        assert!(terms[2].negative);
5120        assert_eq!(
5121            terms[5],
5122            ExplorerFtsPattern {
5123                parts: vec![b" a".to_vec(), b"B".to_vec()],
5124                negative: false,
5125            }
5126        );
5127
5128        let request = json!({
5129            "query": r"alpha|!debug|needle\|pipe",
5130            "facets": ["PRIORITY"],
5131        });
5132        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5133            .expect("parse request");
5134        let query =
5135            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5136        assert_eq!(
5137            query.fts_patterns,
5138            vec![b"alpha".to_vec(), b"needle|pipe".to_vec()]
5139        );
5140        assert_eq!(query.fts_negative_patterns, vec![b"debug".to_vec()]);
5141        assert_eq!(query.fts_terms.len(), 3);
5142    }
5143
5144    #[test]
5145    fn netdata_requests_never_enable_debug_row_traversal_column_collection() {
5146        let request = json!({
5147            "facets": ["PRIORITY", "_HOSTNAME"],
5148            "histogram": "PRIORITY",
5149            "last": 25
5150        });
5151
5152        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5153            .expect("parse request");
5154        let query =
5155            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5156
5157        assert!(!query.debug_collect_column_fields_by_row_traversal);
5158    }
5159
5160    #[test]
5161    fn netdata_function_reports_requested_facet_groups_even_when_absent() {
5162        let dir = TempDir::new().expect("tempdir");
5163        write_netdata_test_journal(dir.path(), 10);
5164        let request = json!({
5165            "after": 1_700_000_000,
5166            "before": 1_700_000_010,
5167            "facets": ["SERVICE", "MISSING_FIELD"],
5168            "histogram": "SERVICE",
5169            "last": 5,
5170            "slice": true
5171        });
5172        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5173
5174        let response = function
5175            .run_directory_request_json_with_options(
5176                dir.path(),
5177                &request,
5178                NetdataFunctionRunOptions::from_timeout_seconds(0),
5179            )
5180            .expect("run function");
5181
5182        let columns = response["columns"].as_object().expect("columns");
5183        assert!(columns.contains_key("SERVICE"));
5184        assert!(columns.contains_key("MISSING_FIELD"));
5185        let facets = response["facets"].as_array().expect("facets");
5186        assert_eq!(
5187            facets
5188                .iter()
5189                .filter_map(|facet| facet["id"].as_str())
5190                .collect::<Vec<_>>(),
5191            vec!["SERVICE", "MISSING_FIELD"]
5192        );
5193        assert_eq!(
5194            facets[1]["options"].as_array().expect("missing options"),
5195            &Vec::<Value>::new()
5196        );
5197        let accepted = response["accepted_params"]
5198            .as_array()
5199            .expect("accepted params");
5200        assert!(accepted.iter().any(|value| value == "SERVICE"));
5201        assert!(accepted.iter().any(|value| value == "MISSING_FIELD"));
5202        let histograms = response["available_histograms"]
5203            .as_array()
5204            .expect("available histograms");
5205        assert!(histograms.iter().any(|value| value["id"] == "SERVICE"));
5206        assert!(
5207            histograms
5208                .iter()
5209                .any(|value| value["id"] == "MISSING_FIELD")
5210        );
5211    }
5212
5213    #[test]
5214    fn netdata_function_reports_zero_count_existing_facets_for_empty_results() {
5215        let dir = TempDir::new().expect("tempdir");
5216        write_netdata_test_journal(dir.path(), 10);
5217        let request = json!({
5218            "after": 1_700_000_000,
5219            "before": 1_700_000_010,
5220            "facets": ["PRIORITY"],
5221            "histogram": "PRIORITY",
5222            "selections": {
5223                "SERVICE": ["missing"]
5224            },
5225            "last": 5,
5226            "slice": true
5227        });
5228        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5229
5230        let response = function
5231            .run_directory_request_json_with_options(
5232                dir.path(),
5233                &request,
5234                NetdataFunctionRunOptions::from_timeout_seconds(0),
5235            )
5236            .expect("run function");
5237
5238        let facets = response["facets"].as_array().expect("facets");
5239        assert_eq!(facets.len(), 1);
5240        assert_eq!(facets[0]["id"], "PRIORITY");
5241        let options = facets[0]["options"].as_array().expect("options");
5242        assert!(options.iter().any(|option| {
5243            option["id"] == "6" && option["name"] == "info" && option["count"] == 0
5244        }));
5245        assert_eq!(response["items"]["matched"], 0);
5246    }
5247
5248    #[test]
5249    fn netdata_function_api_reports_progress() {
5250        let dir = TempDir::new().expect("tempdir");
5251        write_netdata_test_journal(dir.path(), 9_000);
5252        let request = json!({
5253            "after": 1_700_000_000,
5254            "before": 1_700_000_010,
5255            "facets": ["SERVICE"],
5256            "histogram": "SERVICE",
5257            "last": 0
5258        });
5259        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5260        let mut reports = 0u64;
5261        let mut progress = |progress: NetdataFunctionProgress| {
5262            reports = reports.saturating_add(1);
5263            assert_eq!(progress.current_file, 1);
5264            assert_eq!(progress.total_files, 1);
5265            assert!(progress.stats.rows_examined <= 9_000);
5266        };
5267        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5268        options.progress_interval = Duration::ZERO;
5269        options.progress_callback = Some(&mut progress);
5270
5271        let response = function
5272            .run_directory_request_json_with_options(dir.path(), &request, options)
5273            .expect("run function");
5274
5275        assert_eq!(response["status"], 200);
5276        assert!(reports > 0);
5277        assert_eq!(response["last_modified"], 1_700_000_000_008_999u64);
5278    }
5279
5280    #[test]
5281    fn netdata_function_api_reports_file_end_progress_for_small_scans() {
5282        let dir = TempDir::new().expect("tempdir");
5283        write_netdata_test_journal(dir.path(), 10);
5284        let request = json!({
5285            "after": 1_700_000_000,
5286            "before": 1_700_000_010,
5287            "facets": ["SERVICE"],
5288            "histogram": "SERVICE",
5289            "last": 0
5290        });
5291        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5292        let mut reports = 0u64;
5293        let mut last_rows_examined = 0u64;
5294        let mut progress = |progress: NetdataFunctionProgress| {
5295            reports = reports.saturating_add(1);
5296            last_rows_examined = progress.stats.rows_examined;
5297        };
5298        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5299        options.progress_callback = Some(&mut progress);
5300
5301        let response = function
5302            .run_directory_request_json_with_options(dir.path(), &request, options)
5303            .expect("run function");
5304
5305        assert_eq!(response["status"], 200);
5306        assert_eq!(reports, 1);
5307        assert_eq!(last_rows_examined, 10);
5308    }
5309
5310    #[test]
5311    fn netdata_function_progress_counts_only_query_files() {
5312        let dir = TempDir::new().expect("tempdir");
5313        write_named_netdata_test_journal(
5314            dir.path(),
5315            "old-window.journal",
5316            10,
5317            1_600_000_000_000_000,
5318        );
5319        write_named_netdata_test_journal(
5320            dir.path(),
5321            "current-window.journal",
5322            10,
5323            1_700_000_000_000_000,
5324        );
5325        let request = json!({
5326            "after": 1_700_000_000,
5327            "before": 1_700_000_010,
5328            "facets": ["SERVICE"],
5329            "histogram": "SERVICE",
5330            "last": 0
5331        });
5332        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5333        let mut reports = Vec::new();
5334        let mut progress = |progress: NetdataFunctionProgress| {
5335            reports.push((progress.current_file, progress.total_files));
5336        };
5337        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5338        options.progress_callback = Some(&mut progress);
5339
5340        let response = function
5341            .run_directory_request_json_with_options(dir.path(), &request, options)
5342            .expect("run function");
5343
5344        assert_eq!(response["status"], 200);
5345        assert_eq!(response["_journal_files"]["matched"], 1);
5346        assert_eq!(reports, vec![(1, 1)]);
5347    }
5348
5349    #[test]
5350    fn netdata_function_api_reports_cancellation() {
5351        let dir = TempDir::new().expect("tempdir");
5352        write_netdata_test_journal(dir.path(), 9_000);
5353        let request = json!({
5354            "after": 1_700_000_000,
5355            "before": 1_700_000_010,
5356            "facets": ["SERVICE"],
5357            "histogram": "SERVICE",
5358            "last": 0
5359        });
5360        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5361        let is_cancelled = || true;
5362        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5363        options.cancellation_callback = Some(&is_cancelled);
5364
5365        let response = function
5366            .run_directory_request_json_with_options(dir.path(), &request, options)
5367            .expect("run function");
5368
5369        assert_eq!(response["status"], 499);
5370        assert_eq!(response["errorMessage"], "Request cancelled.");
5371        assert_eq!(
5372            response.as_object().expect("response object").len(),
5373            2,
5374            "plugin-compatible function errors only include status and errorMessage"
5375        );
5376    }
5377
5378    #[test]
5379    fn netdata_function_api_cancels_during_active_scan() {
5380        let dir = TempDir::new().expect("tempdir");
5381        write_netdata_test_journal(dir.path(), 9_000);
5382        let request = json!({
5383            "after": 1_700_000_000,
5384            "before": 1_700_000_010,
5385            "facets": ["SERVICE"],
5386            "histogram": "SERVICE",
5387            "last": 0
5388        });
5389        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5390        let should_cancel = Cell::new(false);
5391        let mut reports = 0u64;
5392        let mut progress = |progress: NetdataFunctionProgress| {
5393            reports = reports.saturating_add(1);
5394            if progress.stats.rows_examined > 0 {
5395                should_cancel.set(true);
5396            }
5397        };
5398        let is_cancelled = || should_cancel.get();
5399        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5400        options.progress_interval = Duration::ZERO;
5401        options.progress_callback = Some(&mut progress);
5402        options.cancellation_callback = Some(&is_cancelled);
5403
5404        let response = function
5405            .run_directory_request_json_with_options(dir.path(), &request, options)
5406            .expect("run function");
5407
5408        assert_eq!(response["status"], 499);
5409        assert_eq!(response["errorMessage"], "Request cancelled.");
5410        assert!(reports > 0);
5411        assert!(should_cancel.get());
5412    }
5413
5414    #[test]
5415    fn netdata_function_api_honors_cancellation_after_final_file_progress() {
5416        let dir = TempDir::new().expect("tempdir");
5417        write_netdata_test_journal(dir.path(), 10);
5418        let request = json!({
5419            "after": 1_700_000_000,
5420            "before": 1_700_000_010,
5421            "facets": ["SERVICE"],
5422            "histogram": "SERVICE",
5423            "last": 0
5424        });
5425        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5426        let should_cancel = Cell::new(false);
5427        let mut progress = |_progress: NetdataFunctionProgress| {
5428            should_cancel.set(true);
5429        };
5430        let is_cancelled = || should_cancel.get();
5431        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5432        options.progress_callback = Some(&mut progress);
5433        options.cancellation_callback = Some(&is_cancelled);
5434
5435        let response = function
5436            .run_directory_request_json_with_options(dir.path(), &request, options)
5437            .expect("run function");
5438
5439        assert_eq!(response["status"], 499);
5440        assert_eq!(response["errorMessage"], "Request cancelled.");
5441        assert!(should_cancel.get());
5442    }
5443
5444    #[test]
5445    fn netdata_function_api_reports_timeout_as_partial_table() {
5446        let dir = TempDir::new().expect("tempdir");
5447        write_netdata_test_journal(dir.path(), 10);
5448        let request = json!({
5449            "after": 1_700_000_000,
5450            "before": 1_700_000_010,
5451            "facets": ["SERVICE"],
5452            "histogram": "SERVICE",
5453            "last": 0
5454        });
5455        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5456        let options = NetdataFunctionRunOptions {
5457            timeout: Some(Duration::ZERO),
5458            ..NetdataFunctionRunOptions::default()
5459        };
5460
5461        let response = function
5462            .run_directory_request_json_with_options(dir.path(), &request, options)
5463            .expect("run function");
5464
5465        assert_eq!(response["status"], 200);
5466        assert_eq!(response["partial"], true);
5467        assert_eq!(response["message"]["status"], "warning");
5468        assert_eq!(
5469            response["message"]["title"],
5470            "Query timed-out, incomplete data. "
5471        );
5472    }
5473
5474    #[test]
5475    fn netdata_function_api_reports_sampling_counters() {
5476        let dir = TempDir::new().expect("tempdir");
5477        write_stepped_netdata_test_journal(
5478            dir.path(),
5479            "netdata-api-test.journal",
5480            5_000,
5481            1_700_000_000_000_000,
5482            1_000,
5483        );
5484        let request = json!({
5485            "after": 1_700_000_000,
5486            "before": 1_700_000_005,
5487            "facets": ["SERVICE"],
5488            "histogram": "SERVICE",
5489            "last": 5,
5490            "sampling": 20
5491        });
5492        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5493
5494        let response = function
5495            .run_directory_request_json_with_options(
5496                dir.path(),
5497                &request,
5498                NetdataFunctionRunOptions::from_timeout_seconds(0),
5499            )
5500            .expect("run function");
5501
5502        assert_eq!(response["status"], 200);
5503        assert!(
5504            response["_sampling"]["sampled"]
5505                .as_u64()
5506                .unwrap_or_default()
5507                > 0
5508        );
5509        assert!(
5510            response["_sampling"]["unsampled"]
5511                .as_u64()
5512                .unwrap_or_default()
5513                > 0
5514        );
5515        assert!(
5516            response["_sampling"]["estimated"]
5517                .as_u64()
5518                .unwrap_or_default()
5519                > 0
5520        );
5521        assert_eq!(
5522            response["items"]["estimated"],
5523            response["_sampling"]["estimated"]
5524        );
5525        assert!(
5526            response["items"]["unsampled"].as_u64().unwrap_or_default()
5527                < response["_sampling"]["unsampled"]
5528                    .as_u64()
5529                    .unwrap_or_default()
5530        );
5531        assert_eq!(response["message"]["status"], "notice");
5532    }
5533
5534    #[test]
5535    fn netdata_function_api_disables_sampling_for_data_only() {
5536        let dir = TempDir::new().expect("tempdir");
5537        write_netdata_test_journal(dir.path(), 5_000);
5538        let request = json!({
5539            "after": 1_700_000_000,
5540            "before": 1_700_000_010,
5541            "data_only": true,
5542            "last": 5,
5543            "sampling": 20
5544        });
5545        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5546
5547        let response = function
5548            .run_directory_request_json_with_options(
5549                dir.path(),
5550                &request,
5551                NetdataFunctionRunOptions::from_timeout_seconds(0),
5552            )
5553            .expect("run function");
5554
5555        assert_eq!(response["status"], 200);
5556        assert!(response.get("_sampling").is_none());
5557    }
5558
5559    #[test]
5560    fn normalizes_missing_time_window_to_last_hour_like_plugin() {
5561        assert_eq!(
5562            normalize_time_window(1_000_000_000, None, None),
5563            (Some(999_996_400_000_000), Some(1_000_000_000_999_999))
5564        );
5565    }
5566
5567    #[test]
5568    fn normalizes_inverted_time_window_like_plugin() {
5569        assert_eq!(
5570            normalize_time_window(1_000_000_000, Some(200_000_100), Some(200_000_000)),
5571            (Some(200_000_000_000_000), Some(200_000_100_999_999))
5572        );
5573    }
5574
5575    #[test]
5576    fn normalizes_equal_time_window_like_plugin() {
5577        assert_eq!(
5578            normalize_time_window(1_000_000_000, Some(200_000_000), Some(200_000_000)),
5579            (Some(199_996_400_000_000), Some(200_000_000_999_999))
5580        );
5581    }
5582
5583    #[test]
5584    fn normalizes_relative_time_window_like_plugin() {
5585        assert_eq!(
5586            normalize_time_window(1_000_000_000, Some(100), Some(200)),
5587            (Some(999_999_701_000_000), Some(999_999_800_999_999))
5588        );
5589    }
5590
5591    #[test]
5592    fn normalizes_missing_after_with_supplied_before_like_plugin() {
5593        assert_eq!(
5594            normalize_time_window(1_000_000_000, None, Some(200_000_000)),
5595            (Some(199_999_401_000_000), Some(200_000_000_999_999))
5596        );
5597    }
5598
5599    #[test]
5600    fn systemd_profile_transforms_priority_and_facility_for_display() {
5601        let profile = SystemdJournalProfile;
5602        let context = DisplayContext::default();
5603        assert_eq!(
5604            profile.field_display_value(&context, DisplayScope::Data, "PRIORITY", b"7"),
5605            json!("debug")
5606        );
5607        assert_eq!(
5608            profile.field_display_value(&context, DisplayScope::Data, "SYSLOG_FACILITY", b"3"),
5609            json!("daemon")
5610        );
5611        assert_eq!(priority_to_row_severity(b"3"), "critical");
5612        assert_eq!(priority_to_row_severity(b"6"), "normal");
5613    }
5614
5615    #[test]
5616    fn dynamic_process_name_matches_plugin_fallback_order() {
5617        let mut fields = BTreeMap::new();
5618        fields.insert("SYSLOG_IDENTIFIER".to_string(), vec![b"syslog".to_vec()]);
5619        fields.insert("_COMM".to_string(), vec![b"comm".to_vec()]);
5620        fields.insert("_PID".to_string(), vec![b"42".to_vec()]);
5621        fields.insert("SYSLOG_PID".to_string(), vec![b"99".to_vec()]);
5622        assert_eq!(dynamic_process_name(&fields), "syslog[42]");
5623
5624        fields.insert("CONTAINER_NAME".to_string(), vec![b"container".to_vec()]);
5625        assert_eq!(dynamic_process_name(&fields), "container[42]");
5626
5627        fields.remove("CONTAINER_NAME");
5628        fields.remove("SYSLOG_IDENTIFIER");
5629        fields.remove("_PID");
5630        assert_eq!(dynamic_process_name(&fields), "comm[-]");
5631
5632        fields.insert("_PID".to_string(), vec![Vec::new()]);
5633        assert_eq!(dynamic_process_name(&fields), "comm");
5634
5635        fields.remove("_COMM");
5636        fields.remove("_PID");
5637        fields.insert("_EXE".to_string(), vec![b"/usr/bin/app".to_vec()]);
5638        assert_eq!(dynamic_process_name(&fields), "-");
5639    }
5640
5641    #[test]
5642    fn facet_values_are_truncated_and_collapsed_like_plugin() {
5643        let prefix = vec![b'a'; NETDATA_FACET_MAX_VALUE_LENGTH];
5644        let mut first = prefix.clone();
5645        first.extend_from_slice(b"-first");
5646        let mut second = prefix.clone();
5647        second.extend_from_slice(b"-second");
5648
5649        let mut values = BTreeMap::new();
5650        add_netdata_facet_count(&mut values, &first, 2);
5651        add_netdata_facet_count(&mut values, &second, 3);
5652
5653        assert_eq!(values.len(), 1);
5654        assert_eq!(values.get(&prefix), Some(&5));
5655    }
5656
5657    #[test]
5658    fn histogram_values_are_truncated_and_collapsed_like_plugin() {
5659        let prefix = vec![b'b'; NETDATA_FACET_MAX_VALUE_LENGTH];
5660        let mut first = prefix.clone();
5661        first.extend_from_slice(b"-first");
5662        let mut second = prefix.clone();
5663        second.extend_from_slice(b"-second");
5664
5665        let mut values = HashMap::new();
5666        values.insert(first, 2);
5667        values.insert(second, 3);
5668        let histogram = ExplorerHistogram {
5669            field: b"TEST_FIELD".to_vec(),
5670            buckets: vec![ExplorerHistogramBucket {
5671                start_realtime_usec: 1_000_000,
5672                end_realtime_usec: 2_000_000,
5673                values,
5674            }],
5675        };
5676
5677        let function = NetdataJournalFunction::systemd_journal();
5678        let rendered = function.build_histogram(&DisplayContext::default(), &histogram, None);
5679        let labels = rendered["chart"]["result"]["labels"]
5680            .as_array()
5681            .expect("labels");
5682        assert_eq!(labels.len(), 2);
5683        assert_eq!(labels[1], Value::String(String::from_utf8(prefix).unwrap()));
5684        assert_eq!(rendered["chart"]["result"]["data"][0][1][0], json!(5));
5685    }
5686
5687    #[test]
5688    fn histogram_chart_metadata_includes_empty_dimension_arrays() {
5689        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5690        let empty = function.build_histogram(
5691            &DisplayContext::default(),
5692            &ExplorerHistogram {
5693                field: b"TRAP_SEVERITY".to_vec(),
5694                buckets: vec![ExplorerHistogramBucket {
5695                    start_realtime_usec: 1_700_000_000_000_000,
5696                    end_realtime_usec: 1_700_000_005_000_000,
5697                    values: HashMap::new(),
5698                }],
5699            },
5700            None,
5701        );
5702
5703        assert_eq!(empty["chart"]["view"]["dimensions"]["names"], json!([]));
5704        assert_eq!(empty["chart"]["view"]["dimensions"]["ids"], json!([]));
5705        assert_eq!(empty["chart"]["db"]["dimensions"]["names"], json!([]));
5706
5707        let mut values = HashMap::new();
5708        values.insert(b"warning".to_vec(), 7);
5709        let with_value = function.build_histogram(
5710            &DisplayContext::default(),
5711            &ExplorerHistogram {
5712                field: b"TRAP_SEVERITY".to_vec(),
5713                buckets: vec![ExplorerHistogramBucket {
5714                    start_realtime_usec: 1_700_000_000_000_000,
5715                    end_realtime_usec: 1_700_000_005_000_000,
5716                    values,
5717                }],
5718            },
5719            None,
5720        );
5721
5722        assert_eq!(
5723            with_value["chart"]["view"]["dimensions"]["names"],
5724            json!(["warning"])
5725        );
5726        assert_eq!(
5727            with_value["chart"]["view"]["dimensions"]["sts"]["min"],
5728            json!([7])
5729        );
5730    }
5731
5732    #[test]
5733    fn duplicate_row_timestamps_match_plugin_direction_adjustment() {
5734        let mut backward = vec![
5735            test_located_row(100),
5736            test_located_row(100),
5737            test_located_row(100),
5738            test_located_row(90),
5739        ];
5740        make_row_timestamps_unique(&mut backward, Direction::Backward);
5741        assert_eq!(
5742            backward
5743                .iter()
5744                .map(|row| row.row.realtime_usec)
5745                .collect::<Vec<_>>(),
5746            vec![100, 99, 98, 90]
5747        );
5748
5749        let mut forward = vec![
5750            test_located_row(90),
5751            test_located_row(100),
5752            test_located_row(100),
5753            test_located_row(100),
5754        ];
5755        make_row_timestamps_unique(&mut forward, Direction::Forward);
5756        assert_eq!(
5757            forward
5758                .iter()
5759                .map(|row| row.row.realtime_usec)
5760                .collect::<Vec<_>>(),
5761            vec![90, 100, 101, 102]
5762        );
5763    }
5764
5765    #[test]
5766    fn page_window_counts_forward_anchor_like_netdata_facets() {
5767        let config = NetdataFunctionConfig::systemd_journal();
5768        let request = NetdataRequest::parse(
5769            &json!({
5770                "after": 1_700_000_000,
5771                "before": 1_700_000_010,
5772                "anchor": 1_700_000_005_000_000u64,
5773                "direction": "forward",
5774                "last": 2
5775            }),
5776            &config,
5777        )
5778        .expect("parse request");
5779        let mut window = NetdataPageWindow::for_request(&request);
5780
5781        for realtime_usec in [
5782            1_700_000_003_000_000,
5783            1_700_000_004_000_000,
5784            1_700_000_006_000_000,
5785            1_700_000_007_000_000,
5786            1_700_000_008_000_000,
5787        ] {
5788            window.observe(realtime_usec);
5789        }
5790
5791        let counters = window.counters();
5792        assert_eq!(counters.matched, 3);
5793        assert_eq!(counters.before, 1);
5794        assert_eq!(counters.after, 2);
5795    }
5796
5797    #[test]
5798    fn page_window_counts_backward_anchor_like_netdata_facets() {
5799        let config = NetdataFunctionConfig::systemd_journal();
5800        let request = NetdataRequest::parse(
5801            &json!({
5802                "after": 1_700_000_000,
5803                "before": 1_700_000_010,
5804                "anchor": 1_700_000_008_000_000u64,
5805                "direction": "backward",
5806                "last": 2
5807            }),
5808            &config,
5809        )
5810        .expect("parse request");
5811        let mut window = NetdataPageWindow::for_request(&request);
5812
5813        for realtime_usec in [
5814            1_700_000_009_000_000,
5815            1_700_000_007_000_000,
5816            1_700_000_006_000_000,
5817            1_700_000_005_000_000,
5818        ] {
5819            window.observe(realtime_usec);
5820        }
5821
5822        let counters = window.counters();
5823        assert_eq!(counters.matched, 3);
5824        assert_eq!(counters.before, 1);
5825        assert_eq!(counters.after, 1);
5826    }
5827
5828    #[test]
5829    fn page_window_counts_tail_anchor_like_netdata_facets() {
5830        let config = NetdataFunctionConfig::systemd_journal();
5831        let request = NetdataRequest::parse(
5832            &json!({
5833                "after": 1_700_000_000,
5834                "before": 1_700_000_010,
5835                "anchor": 1_700_000_008_000_000u64,
5836                "if_modified_since": 1_700_000_008_000_000u64,
5837                "data_only": true,
5838                "tail": true,
5839                "last": 2
5840            }),
5841            &config,
5842        )
5843        .expect("parse request");
5844        let mut window = NetdataPageWindow::for_request(&request);
5845
5846        for realtime_usec in [
5847            1_700_000_009_000_000,
5848            1_700_000_008_000_000,
5849            1_700_000_007_000_000,
5850        ] {
5851            window.observe(realtime_usec);
5852        }
5853
5854        let counters = window.counters();
5855        assert_eq!(counters.matched, 1);
5856        assert_eq!(counters.before, 0);
5857        assert_eq!(counters.after, 2);
5858    }
5859
5860    #[test]
5861    fn netdata_function_tail_anchor_with_newer_filtered_out_rows_returns_empty_200() {
5862        let dir = TempDir::new().expect("tempdir");
5863        let start_realtime_usec = 1_700_000_000_000_000;
5864        write_stepped_netdata_test_journal(
5865            dir.path(),
5866            "netdata-api-test.journal",
5867            2,
5868            start_realtime_usec,
5869            1_000_000,
5870        );
5871        let request = json!({
5872            "after": 1_700_000_000,
5873            "before": 1_700_000_002,
5874            "anchor": start_realtime_usec,
5875            "if_modified_since": start_realtime_usec,
5876            "data_only": true,
5877            "tail": true,
5878            "last": 5,
5879            "selections": {
5880                "SERVICE": ["even"]
5881            }
5882        });
5883        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5884
5885        let response = function
5886            .run_directory_request_json_with_options(
5887                dir.path(),
5888                &request,
5889                NetdataFunctionRunOptions::from_timeout_seconds(0),
5890            )
5891            .expect("run function");
5892
5893        assert_eq!(response["status"], 200);
5894        assert_eq!(
5895            response_column_strings(&response, "MESSAGE"),
5896            Vec::<String>::new()
5897        );
5898    }
5899
5900    #[test]
5901    fn netdata_function_pages_with_anchor_without_duplicate_or_missing_rows() {
5902        let dir = TempDir::new().expect("tempdir");
5903        let start_realtime_usec = 1_700_000_000_000_000;
5904        write_stepped_netdata_test_journal(
5905            dir.path(),
5906            "paging-contract.journal",
5907            7,
5908            start_realtime_usec,
5909            1,
5910        );
5911
5912        let backward = collect_netdata_pages(dir.path(), Direction::Backward, 2);
5913        assert_eq!(
5914            backward.messages,
5915            vec![
5916                "row-6", "row-5", "row-4", "row-3", "row-2", "row-1", "row-0"
5917            ]
5918        );
5919        assert_unique_messages(&backward.messages);
5920        assert_eq!(
5921            backward.timestamps,
5922            vec![
5923                start_realtime_usec + 6,
5924                start_realtime_usec + 5,
5925                start_realtime_usec + 4,
5926                start_realtime_usec + 3,
5927                start_realtime_usec + 2,
5928                start_realtime_usec + 1,
5929                start_realtime_usec,
5930            ]
5931        );
5932
5933        let forward = collect_netdata_pages(dir.path(), Direction::Forward, 2);
5934        assert_eq!(
5935            forward.messages,
5936            vec![
5937                "row-1", "row-0", "row-3", "row-2", "row-5", "row-4", "row-6"
5938            ]
5939        );
5940        assert_unique_messages(&forward.messages);
5941        assert_eq!(
5942            forward.timestamps,
5943            vec![
5944                start_realtime_usec + 1,
5945                start_realtime_usec,
5946                start_realtime_usec + 3,
5947                start_realtime_usec + 2,
5948                start_realtime_usec + 5,
5949                start_realtime_usec + 4,
5950                start_realtime_usec + 6,
5951            ]
5952        );
5953    }
5954
5955    #[test]
5956    fn netdata_function_tail_polls_return_only_rows_after_anchor_then_304() {
5957        let dir = TempDir::new().expect("tempdir");
5958        let start_realtime_usec = 1_700_000_000_000_000;
5959        write_stepped_netdata_test_journal(
5960            dir.path(),
5961            "tail-contract.journal",
5962            5,
5963            start_realtime_usec,
5964            1,
5965        );
5966        let anchor = start_realtime_usec + 2;
5967
5968        for requested_direction in [Direction::Backward, Direction::Forward] {
5969            let response = run_netdata_contract_request(
5970                dir.path(),
5971                json!({
5972                    "after": 1_700_000_000,
5973                    "before": 1_700_000_010,
5974                    "last": 5,
5975                    "direction": direction_name(requested_direction),
5976                    "data_only": true,
5977                    "tail": true,
5978                    "if_modified_since": anchor,
5979                    "anchor": anchor,
5980                }),
5981            );
5982            assert_eq!(response["status"], 200);
5983            assert_eq!(response["_request"]["direction"], "backward");
5984            assert_eq!(
5985                response_column_strings(&response, "MESSAGE"),
5986                vec!["row-4", "row-3"]
5987            );
5988            assert_eq!(
5989                response_column_u64s(&response, "timestamp"),
5990                vec![start_realtime_usec + 4, start_realtime_usec + 3]
5991            );
5992        }
5993
5994        let no_change = run_netdata_contract_request(
5995            dir.path(),
5996            json!({
5997                "after": 1_700_000_000,
5998                "before": 1_700_000_010,
5999                "last": 5,
6000                "direction": "backward",
6001                "data_only": true,
6002                "tail": true,
6003                "if_modified_since": start_realtime_usec + 4,
6004                "anchor": start_realtime_usec + 4,
6005            }),
6006        );
6007        assert_eq!(no_change["status"], 304);
6008        assert_eq!(
6009            no_change["errorMessage"],
6010            "No new data since the previous call."
6011        );
6012    }
6013
6014    #[test]
6015    fn netdata_function_tail_delta_reports_exact_incremental_facets_and_histogram() {
6016        let dir = TempDir::new().expect("tempdir");
6017        let start_realtime_usec = 1_700_000_000_000_000;
6018        write_stepped_netdata_test_journal(
6019            dir.path(),
6020            "delta-contract.journal",
6021            5,
6022            start_realtime_usec,
6023            1,
6024        );
6025        let anchor = start_realtime_usec + 1;
6026
6027        let response = run_netdata_contract_request(
6028            dir.path(),
6029            json!({
6030                "after": 1_700_000_000,
6031                "before": 1_700_000_010,
6032                "last": 2,
6033                "direction": "backward",
6034                "data_only": true,
6035                "delta": true,
6036                "tail": true,
6037                "if_modified_since": anchor,
6038                "anchor": anchor,
6039                "facets": ["SERVICE"],
6040                "histogram": "SERVICE",
6041            }),
6042        );
6043
6044        assert_eq!(response["status"], 200);
6045        assert_eq!(
6046            response_column_strings(&response, "MESSAGE"),
6047            vec!["row-4", "row-3"]
6048        );
6049        assert_eq!(
6050            response_facet_count(&response, "facets_delta", "SERVICE", "even"),
6051            2
6052        );
6053        assert_eq!(
6054            response_facet_count(&response, "facets_delta", "SERVICE", "odd"),
6055            1
6056        );
6057        assert_eq!(
6058            response_histogram_total(&response, "histogram_delta", "even"),
6059            2
6060        );
6061        assert_eq!(
6062            response_histogram_total(&response, "histogram_delta", "odd"),
6063            1
6064        );
6065        let items = response["items_delta"].as_object().expect("items_delta");
6066        assert_eq!(items["matched"], 3);
6067        assert_eq!(items["returned"], 2);
6068        assert_eq!(items["after"], 2);
6069    }
6070
6071    #[test]
6072    fn realtime_adjuster_preserves_forward_state_across_file_boundaries() {
6073        let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Forward);
6074
6075        assert_eq!(adjuster.adjust(10), 10);
6076        assert_eq!(adjuster.adjust(10), 11);
6077        assert_eq!(adjuster.adjust(10), 12);
6078    }
6079
6080    #[test]
6081    fn realtime_adjuster_preserves_backward_state_across_file_boundaries() {
6082        let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Backward);
6083
6084        assert_eq!(adjuster.adjust(10), 10);
6085        assert_eq!(adjuster.adjust(10), 9);
6086        assert_eq!(adjuster.adjust(10), 8);
6087    }
6088
6089    #[test]
6090    fn systemd_profile_keeps_user_group_ids_raw_by_default() {
6091        let context = DisplayContext::default();
6092        let profile = SystemdJournalProfile;
6093        assert_eq!(
6094            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6095            json!("0")
6096        );
6097        assert_eq!(
6098            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6099            json!("0")
6100        );
6101    }
6102
6103    #[cfg(unix)]
6104    #[test]
6105    fn plugin_compatible_profile_resolves_user_group_ids_explicitly() {
6106        let context = DisplayContext::default();
6107        let profile = SystemdJournalPluginProfile;
6108        assert_eq!(
6109            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6110            json!("root")
6111        );
6112        assert_eq!(
6113            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6114            json!("root")
6115        );
6116    }
6117
6118    #[cfg(unix)]
6119    #[test]
6120    fn plugin_compatible_profile_caches_user_group_resolution() {
6121        let context = DisplayContext::default();
6122        let profile = SystemdJournalPluginProfile;
6123
6124        assert_eq!(
6125            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6126            json!("root")
6127        );
6128        assert_eq!(
6129            profile.field_display_value(&context, DisplayScope::Data, "_UID", b"0"),
6130            json!("root")
6131        );
6132        assert_eq!(
6133            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6134            json!("root")
6135        );
6136        assert_eq!(
6137            profile.field_display_value(&context, DisplayScope::Data, "_GID", b"0"),
6138            json!("root")
6139        );
6140
6141        assert_eq!(context.uid_display_cache.borrow().len(), 1);
6142        assert_eq!(context.gid_display_cache.borrow().len(), 1);
6143    }
6144
6145    #[test]
6146    fn file_overlap_uses_netdata_max_realtime_slack() {
6147        let file_first_seconds = 200_000_000u64;
6148        let file_last_seconds = 200_000_100u64;
6149        let slack_seconds = NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC / 1_000_000;
6150        let header = crate::FileHeader {
6151            signature: *b"LPKSHHRH",
6152            compatible_flags: 0,
6153            incompatible_flags: 0,
6154            state: 0,
6155            header_size: 0,
6156            n_entries: 0,
6157            head_entry_realtime: file_first_seconds * 1_000_000,
6158            tail_entry_realtime: file_last_seconds * 1_000_000,
6159            head_entry_seqnum: 0,
6160            tail_entry_seqnum: 0,
6161            tail_entry_boot_id: [0; 16],
6162            seqnum_id: [0; 16],
6163        };
6164        let config = NetdataFunctionConfig::systemd_journal();
6165
6166        let inside_slack = NetdataRequest::parse(
6167            &json!({
6168                "after": file_last_seconds + slack_seconds - 1,
6169                "before": file_last_seconds + slack_seconds + 500
6170            }),
6171            &config,
6172        )
6173        .expect("parse request");
6174        assert!(file_may_overlap_request(header, &inside_slack));
6175
6176        let outside_slack = NetdataRequest::parse(
6177            &json!({
6178                "after": file_last_seconds + slack_seconds + 1,
6179                "before": file_last_seconds + slack_seconds + 500
6180            }),
6181            &config,
6182        )
6183        .expect("parse request");
6184        assert!(!file_may_overlap_request(header, &outside_slack));
6185    }
6186
6187    #[test]
6188    fn journal_file_order_matches_plugin_comparator_shape() {
6189        let older = JournalFileOrderInfo {
6190            msg_first_realtime_usec: 100,
6191            msg_last_realtime_usec: 200,
6192            file_last_modified_usec: 200,
6193            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6194        };
6195        let newer = JournalFileOrderInfo {
6196            msg_first_realtime_usec: 100,
6197            msg_last_realtime_usec: 300,
6198            file_last_modified_usec: 100,
6199            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6200        };
6201        assert_eq!(
6202            compare_journal_file_order(&newer, &older, Direction::Backward),
6203            Ordering::Less
6204        );
6205        assert_eq!(
6206            compare_journal_file_order(&newer, &older, Direction::Forward),
6207            Ordering::Greater
6208        );
6209
6210        let newer_mtime = JournalFileOrderInfo {
6211            msg_first_realtime_usec: 100,
6212            msg_last_realtime_usec: 200,
6213            file_last_modified_usec: 300,
6214            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6215        };
6216        assert_eq!(
6217            compare_journal_file_order(&newer_mtime, &older, Direction::Backward),
6218            Ordering::Less
6219        );
6220
6221        let newer_first = JournalFileOrderInfo {
6222            msg_first_realtime_usec: 150,
6223            msg_last_realtime_usec: 200,
6224            file_last_modified_usec: 200,
6225            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6226        };
6227        assert_eq!(
6228            compare_journal_file_order(&newer_first, &older, Direction::Backward),
6229            Ordering::Less
6230        );
6231    }
6232
6233    #[test]
6234    fn boot_first_realtime_keeps_earliest_timestamp_like_plugin() {
6235        let mut boot_first = BTreeMap::new();
6236        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 300);
6237        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 100);
6238        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 200);
6239
6240        assert_eq!(boot_first.get(b"boot-a".as_slice()), Some(&100));
6241    }
6242
6243    #[test]
6244    fn merge_histogram_rejects_inconsistent_bucket_shape() {
6245        let mut target = Some(ExplorerHistogram {
6246            field: b"PRIORITY".to_vec(),
6247            buckets: vec![ExplorerHistogramBucket {
6248                start_realtime_usec: 1_000_000,
6249                end_realtime_usec: 2_000_000,
6250                values: HashMap::new(),
6251            }],
6252        });
6253        let source = ExplorerHistogram {
6254            field: b"PRIORITY".to_vec(),
6255            buckets: vec![ExplorerHistogramBucket {
6256                start_realtime_usec: 1_000_000,
6257                end_realtime_usec: 1_500_000,
6258                values: HashMap::new(),
6259            }],
6260        };
6261
6262        let err = merge_histogram(&mut target, source).expect_err("shape mismatch");
6263        assert!(matches!(
6264            err,
6265            SdkError::Unsupported("inconsistent Netdata histogram bucket shape")
6266        ));
6267    }
6268
6269    #[test]
6270    fn collect_journal_files_recurses_nested_directories() {
6271        let dir = TempDir::new().expect("tempdir");
6272        let nested = dir.path().join("machine").join("nested");
6273        write_named_netdata_test_journal(&nested, "system.journal", 1, 1_700_000_000_000_000);
6274
6275        let collection = collect_journal_files(dir.path()).expect("collect files");
6276
6277        assert_eq!(collection.files.len(), 1);
6278        assert_eq!(collection.skipped, 0);
6279        assert!(collection.errors.is_empty());
6280        assert_eq!(
6281            collection.files[0]
6282                .file_name()
6283                .and_then(|name| name.to_str()),
6284            Some("system.journal")
6285        );
6286    }
6287
6288    #[cfg(unix)]
6289    #[test]
6290    fn collect_journal_files_deduplicates_symlinked_directories() {
6291        let dir = TempDir::new().expect("tempdir");
6292        let real = dir.path().join("real");
6293        let link = dir.path().join("link");
6294        write_named_netdata_test_journal(&real, "system.journal", 1, 1_700_000_000_000_000);
6295        std::os::unix::fs::symlink(&real, &link).expect("symlink");
6296
6297        let collection = collect_journal_files(dir.path()).expect("collect files");
6298
6299        assert_eq!(collection.files.len(), 1);
6300        assert_eq!(collection.skipped, 0);
6301        assert!(collection.errors.is_empty());
6302    }
6303
6304    #[cfg(unix)]
6305    #[test]
6306    fn collect_journal_files_deduplicates_symlinked_files() {
6307        let dir = TempDir::new().expect("tempdir");
6308        write_named_netdata_test_journal(dir.path(), "system.journal", 1, 1_700_000_000_000_000);
6309        std::os::unix::fs::symlink(
6310            dir.path().join("system.journal"),
6311            dir.path().join("system-copy.journal"),
6312        )
6313        .expect("symlink");
6314
6315        let collection = collect_journal_files(dir.path()).expect("collect files");
6316
6317        assert_eq!(collection.files.len(), 1);
6318        assert_eq!(collection.skipped, 0);
6319        assert!(collection.errors.is_empty());
6320    }
6321
6322    #[cfg(unix)]
6323    #[test]
6324    fn collect_journal_files_reports_unreadable_subdirectories() {
6325        use std::os::unix::fs::PermissionsExt;
6326
6327        if unsafe { libc::geteuid() } == 0 {
6328            return;
6329        }
6330
6331        let dir = TempDir::new().expect("tempdir");
6332        std::fs::write(dir.path().join("visible.journal"), b"").expect("journal");
6333        let locked = dir.path().join("locked");
6334        std::fs::create_dir(&locked).expect("locked dir");
6335        std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o000))
6336            .expect("lock dir");
6337
6338        let collection = collect_journal_files(dir.path()).expect("collect files");
6339
6340        std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o700))
6341            .expect("unlock dir");
6342        assert_eq!(collection.files.len(), 1);
6343        assert_eq!(collection.skipped, 1);
6344        assert_eq!(collection.errors.len(), 1);
6345        assert!(collection.errors[0].contains("locked"));
6346    }
6347
6348    #[test]
6349    fn source_summary_fills_missing_caller_metadata_from_header() {
6350        let dir = TempDir::new().expect("tempdir");
6351        write_named_netdata_test_journal(dir.path(), "system.journal", 2, 1_700_000_000_000_000);
6352        let path = dir.path().join("system.journal");
6353        let mut state = TestNetdataState::default();
6354        state.metadata.insert(
6355            path.clone(),
6356            NetdataJournalFileMetadata {
6357                msg_first_realtime_usec: Some(1_699_999_999_000_000),
6358                ..NetdataJournalFileMetadata::default()
6359            },
6360        );
6361        let options = NetdataFunctionRunOptions {
6362            state: Some(&mut state),
6363            ..NetdataFunctionRunOptions::default()
6364        };
6365
6366        let summary = JournalSourceSummary::from_paths(&[path], ReaderOptions::default(), &options);
6367
6368        assert_eq!(summary.first_realtime_usec, Some(1_699_999_999_000_000));
6369        assert_eq!(summary.last_realtime_usec, Some(1_700_000_000_000_001));
6370    }
6371
6372    #[test]
6373    fn source_summary_coverage_is_off_below_one_second() {
6374        let first = 1_700_000_000_000_000;
6375        let mut summary = JournalSourceSummary {
6376            files: 1,
6377            total_size: 0,
6378            first_realtime_usec: Some(first),
6379            last_realtime_usec: Some(first + 999_999),
6380        };
6381
6382        assert!(summary.info().contains("covering off"));
6383
6384        summary.last_realtime_usec = Some(first + 1_000_000);
6385        assert!(summary.info().contains("covering 1s"));
6386    }
6387
6388    #[test]
6389    fn source_selection_echoes_and_filters_known_groups() {
6390        let config = NetdataFunctionConfig::systemd_journal();
6391        let request = NetdataRequest::parse(
6392            &json!({
6393                "selections": {
6394                    "__logs_sources": ["all-local-system-logs"]
6395                }
6396            }),
6397            &config,
6398        )
6399        .expect("parse source-filtered request");
6400
6401        assert_eq!(request.source_type, SOURCE_TYPE_LOCAL_SYSTEM);
6402        assert_eq!(
6403            request.echo.get("source_type").and_then(Value::as_u64),
6404            Some(SOURCE_TYPE_LOCAL_SYSTEM)
6405        );
6406        assert!(
6407            request
6408                .echo
6409                .pointer("/selections/__logs_sources/0")
6410                .is_some_and(Value::is_null)
6411        );
6412        assert!(request.matches_source(Path::new("/var/log/journal/machine/system.journal"), None));
6413        assert!(!request.matches_source(
6414            Path::new("/var/log/journal/machine/user-1000.journal"),
6415            None
6416        ));
6417    }
6418
6419    #[test]
6420    fn source_selection_uses_caller_metadata_before_filename_fallback() {
6421        let dir = TempDir::new().expect("tempdir");
6422        write_named_netdata_test_journal(dir.path(), "user-1000.journal", 1, 1_700_000_000_000_000);
6423        let path = dir.path().join("user-1000.journal");
6424        let config = NetdataFunctionConfig::systemd_journal();
6425        let request = NetdataRequest::parse(
6426            &json!({
6427                "after": 1_700_000_000,
6428                "before": 1_700_000_001,
6429                "selections": {
6430                    "__logs_sources": ["all-local-system-logs"]
6431                }
6432            }),
6433            &config,
6434        )
6435        .expect("parse source-filtered request");
6436        assert!(!request.matches_source(&path, None));
6437
6438        let mut state = TestNetdataState::default();
6439        state.metadata.insert(
6440            path.clone(),
6441            NetdataJournalFileMetadata {
6442                source_type: Some(NETDATA_SOURCE_TYPE_LOCAL_SYSTEM),
6443                source_name: Some("system-registry".to_string()),
6444                ..NetdataJournalFileMetadata::default()
6445            },
6446        );
6447        let options = NetdataFunctionRunOptions {
6448            state: Some(&mut state),
6449            ..NetdataFunctionRunOptions::default()
6450        };
6451        let selected =
6452            select_journal_files_for_request(vec![path], &request, config.reader_options, &options);
6453
6454        assert_eq!(selected.files.len(), 1);
6455    }
6456
6457    #[test]
6458    fn netdata_function_state_receives_learned_source_realtime_delta() {
6459        let dir = TempDir::new().expect("tempdir");
6460        let commit_realtime_usec = 1_700_000_030_000_000;
6461        let source_realtime_usec = commit_realtime_usec - 30_000_000;
6462        write_source_realtime_delta_journal(
6463            dir.path(),
6464            "system.journal",
6465            commit_realtime_usec,
6466            source_realtime_usec,
6467        );
6468        let request = json!({
6469            "after": 1_700_000_029,
6470            "before": 1_700_000_031,
6471            "facets": ["SERVICE"],
6472            "histogram": "SERVICE",
6473            "last": 1,
6474            "sampling": 0
6475        });
6476        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
6477        let mut state = TestNetdataState::default();
6478        let options = NetdataFunctionRunOptions {
6479            state: Some(&mut state),
6480            ..NetdataFunctionRunOptions::from_timeout_seconds(0)
6481        };
6482
6483        let response = function
6484            .run_directory_request_json_with_options(dir.path(), &request, options)
6485            .expect("run function");
6486
6487        assert_eq!(response["status"], 200);
6488        assert_eq!(state.updates.len(), 1);
6489        assert_eq!(state.updates[0].1, 30_000_000);
6490    }
6491
6492    #[test]
6493    fn source_classification_matches_plugin_filename_shape() {
6494        assert_eq!(
6495            journal_file_source_type(Path::new("/var/log/journal/machine/system.journal")),
6496            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM
6497        );
6498        assert_eq!(
6499            journal_file_source_type(Path::new("/var/log/journal/machine/user-1000.journal")),
6500            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER
6501        );
6502        assert_eq!(
6503            journal_file_source_type(Path::new("/var/log/journal/machine/other.journal")),
6504            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
6505        );
6506        assert_eq!(
6507            journal_file_source_type(Path::new(
6508                "/var/log/journal/machine.namespace/system.journal"
6509            )),
6510            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE
6511        );
6512        assert_eq!(
6513            journal_file_source_type(Path::new(
6514                "/var/log/journal/remote/remote-host-a@machine.journal"
6515            )),
6516            SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL
6517        );
6518    }
6519
6520    #[test]
6521    fn exact_source_names_follow_plugin_prefixes() {
6522        assert_eq!(
6523            journal_file_exact_source_name(Path::new(
6524                "/var/log/journal/machine.namespace/system.journal"
6525            ))
6526            .as_deref(),
6527            Some("namespace-namespace")
6528        );
6529        assert_eq!(
6530            journal_file_exact_source_name(Path::new(
6531                "/var/log/journal/remote/remote-host-a@machine.journal"
6532            ))
6533            .as_deref(),
6534            Some("remote-host-a")
6535        );
6536        assert_eq!(
6537            journal_file_exact_source_name(Path::new(
6538                "/var/log/journal/remote/remote-host-b.journal~.zst"
6539            ))
6540            .as_deref(),
6541            Some("remote-host-b")
6542        );
6543    }
6544
6545    #[test]
6546    fn disposed_journal_extension_matches_plugin_scan_contract() {
6547        assert!(is_journal_file_name(Path::new("active.journal")));
6548        assert!(is_journal_file_name(Path::new("archived.journal~")));
6549        assert!(is_journal_file_name(Path::new("active.journal.zst")));
6550        assert!(is_journal_file_name(Path::new("archived.journal~.zst")));
6551    }
6552
6553    fn test_located_row(realtime_usec: u64) -> LocatedRow {
6554        LocatedRow {
6555            file_path: PathBuf::from("test.journal"),
6556            row: ExplorerRow {
6557                realtime_usec,
6558                cursor: String::new(),
6559                payloads: Vec::new(),
6560            },
6561        }
6562    }
6563
6564    fn write_source_realtime_delta_journal(
6565        directory: &std::path::Path,
6566        name: &str,
6567        commit_realtime_usec: u64,
6568        source_realtime_usec: u64,
6569    ) {
6570        std::fs::create_dir_all(directory).expect("create journal dir");
6571        let path = directory.join(name);
6572        let repo_file = RepoFile::from_path(&path).expect("repo file");
6573        let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
6574        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
6575        let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
6576        let source = format!("_SOURCE_REALTIME_TIMESTAMP={source_realtime_usec}");
6577        let payloads: [&[u8]; 4] = [
6578            b"MESSAGE=source lag test".as_slice(),
6579            b"SERVICE=delta".as_slice(),
6580            b"PRIORITY=6".as_slice(),
6581            source.as_bytes(),
6582        ];
6583        writer
6584            .add_entry(
6585                &mut file,
6586                &payloads,
6587                commit_realtime_usec,
6588                commit_realtime_usec,
6589            )
6590            .expect("write entry");
6591        file.sync().expect("sync journal");
6592    }
6593}