Skip to main content

journal/
netdata.rs

1use crate::explorer::{ExplorerSamplingState, histogram_bucket_count_for_query};
2use crate::{
3    Direction, ExplorerAnchor, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
4    ExplorerFtsPattern, ExplorerHistogram, ExplorerProgress, ExplorerQuery, ExplorerResult,
5    ExplorerRow, ExplorerSampling, ExplorerStats, ExplorerStopReason, ExplorerStrategy, FileHeader,
6    FileReader, ReaderOptions, Result, SdkError,
7};
8use chrono::{DateTime, Utc};
9use serde_json::{Map, Value, json};
10use std::cell::RefCell;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet, VecDeque};
13#[cfg(unix)]
14use std::ffi::CStr;
15use std::path::{Path, PathBuf};
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18const DEFAULT_FUNCTION_NAME: &str = "systemd-journal";
19const DEFAULT_ITEMS_TO_RETURN: usize = 200;
20const DEFAULT_TIME_WINDOW_SECONDS: i64 = 3600;
21const DEFAULT_ITEMS_SAMPLING: u64 = 1_000_000;
22const DATA_ONLY_CHECK_EVERY_ROWS: u64 = 128;
23const API_RELATIVE_TIME_MAX_SECONDS: i64 = 3 * 365 * 86_400;
24const NETDATA_MISSING_AFTER_RELATIVE_SECONDS: i64 = 600;
25const DEFAULT_HISTOGRAM_BUCKETS: usize = 150;
26const EFFECTIVELY_DISABLED_TIMEOUT_SECONDS: u64 = 100 * 365 * 24 * 60 * 60;
27const NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC: u64 = 5_000_000;
28const NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC: u64 = 2 * 60 * 1_000_000;
29const NETDATA_EMPTY_STRING_FACET_HASH_ID: &str = "CzGfAU2z3TC";
30const NETDATA_UNAVAILABLE_FIELD_LABEL: &str = "[unavailable field]";
31const NETDATA_FACET_MAX_VALUE_LENGTH: usize = 8192;
32const NETDATA_MAX_DIRECTORY_SCAN_DEPTH: usize = 64;
33const NETDATA_MAX_DIRECTORY_SCAN_COUNT: usize = 8192;
34const SOURCE_TYPE_ALL: u64 = 1 << 0;
35const SOURCE_TYPE_LOCAL_ALL: u64 = 1 << 1;
36const SOURCE_TYPE_REMOTE_ALL: u64 = 1 << 2;
37const SOURCE_TYPE_LOCAL_SYSTEM: u64 = 1 << 3;
38const SOURCE_TYPE_LOCAL_USER: u64 = 1 << 4;
39const SOURCE_TYPE_LOCAL_NAMESPACE: u64 = 1 << 5;
40const SOURCE_TYPE_LOCAL_OTHER: u64 = 1 << 6;
41
42pub const NETDATA_SOURCE_TYPE_ALL: u64 = SOURCE_TYPE_ALL;
43pub const NETDATA_SOURCE_TYPE_LOCAL_ALL: u64 = SOURCE_TYPE_LOCAL_ALL;
44pub const NETDATA_SOURCE_TYPE_REMOTE_ALL: u64 = SOURCE_TYPE_REMOTE_ALL;
45pub const NETDATA_SOURCE_TYPE_LOCAL_SYSTEM: u64 = SOURCE_TYPE_LOCAL_SYSTEM;
46pub const NETDATA_SOURCE_TYPE_LOCAL_USER: u64 = SOURCE_TYPE_LOCAL_USER;
47pub const NETDATA_SOURCE_TYPE_LOCAL_NAMESPACE: u64 = SOURCE_TYPE_LOCAL_NAMESPACE;
48pub const NETDATA_SOURCE_TYPE_LOCAL_OTHER: u64 = SOURCE_TYPE_LOCAL_OTHER;
49
50const NETDATA_ACCEPTED_PARAMS: &[&str] = &[
51    "info",
52    "__logs_sources",
53    "after",
54    "before",
55    "anchor",
56    "direction",
57    "last",
58    "query",
59    "facets",
60    "histogram",
61    "if_modified_since",
62    "data_only",
63    "delta",
64    "tail",
65    "sampling",
66    "slice",
67];
68
69const SYSTEMD_DEFAULT_VIEW_KEYS: &[&str] = &[
70    "_HOSTNAME",
71    "ND_JOURNAL_PROCESS",
72    "MESSAGE",
73    "PRIORITY",
74    "SYSLOG_FACILITY",
75    "ERRNO",
76    "ND_JOURNAL_FILE",
77    "SYSLOG_IDENTIFIER",
78    "UNIT",
79    "USER_UNIT",
80    "MESSAGE_ID",
81    "_BOOT_ID",
82    "_SYSTEMD_OWNER_UID",
83    "_UID",
84    "OBJECT_SYSTEMD_OWNER_UID",
85    "OBJECT_UID",
86    "_GID",
87    "OBJECT_GID",
88    "_CAP_EFFECTIVE",
89    "_AUDIT_LOGINUID",
90    "OBJECT_AUDIT_LOGINUID",
91    "_SOURCE_REALTIME_TIMESTAMP",
92];
93
94const SYSTEMD_DEFAULT_FACETS: &[&str] = &[
95    "_HOSTNAME",
96    "PRIORITY",
97    "SYSLOG_FACILITY",
98    "ERRNO",
99    "SYSLOG_IDENTIFIER",
100    "UNIT",
101    "USER_UNIT",
102    "MESSAGE_ID",
103    "_BOOT_ID",
104    "_SYSTEMD_OWNER_UID",
105    "_UID",
106    "OBJECT_SYSTEMD_OWNER_UID",
107    "OBJECT_UID",
108    "_GID",
109    "OBJECT_GID",
110    "_AUDIT_LOGINUID",
111    "OBJECT_AUDIT_LOGINUID",
112    "CODE_FILE",
113    "_SYSTEMD_UNIT",
114    "_SYSTEMD_USER_SLICE",
115    "CODE_FUNC",
116    "_TRANSPORT",
117    "_COMM",
118    "_RUNTIME_SCOPE",
119    "_MACHINE_ID",
120    "_SYSTEMD_SLICE",
121    "UNIT_RESULT",
122    "_SYSTEMD_CGROUP",
123    "_EXE",
124    "_SYSTEMD_USER_UNIT",
125    "_SYSTEMD_SESSION",
126    "COREDUMP_CGROUP",
127    "COREDUMP_USER_UNIT",
128    "COREDUMP_UNIT",
129    "COREDUMP_SIGNAL_NAME",
130    "COREDUMP_COMM",
131    "_UDEV_DEVNODE",
132    "_KERNEL_SUBSYSTEM",
133    "OBJECT_EXE",
134    "OBJECT_SYSTEMD_CGROUP",
135    "OBJECT_COMM",
136    "OBJECT_SYSTEMD_UNIT",
137    "OBJECT_SYSTEMD_USER_UNIT",
138    "_SELINUX_CONTEXT",
139    "_NAMESPACE",
140    "OBJECT_SYSTEMD_SESSION",
141    "CONTAINER_ID",
142    "CONTAINER_NAME",
143    "CONTAINER_TAG",
144    "IMAGE_NAME",
145    "ND_NIDL_NODE",
146    "ND_NIDL_CONTEXT",
147    "ND_LOG_SOURCE",
148    "ND_ALERT_NAME",
149    "ND_ALERT_CLASS",
150    "ND_ALERT_COMPONENT",
151    "ND_ALERT_TYPE",
152    "ND_ALERT_STATUS",
153];
154
155#[derive(Debug, Clone)]
156pub struct NetdataFunctionConfig {
157    pub function_name: String,
158    pub default_facets: Vec<String>,
159    pub default_view_keys: Vec<String>,
160    pub default_histogram: Option<String>,
161    pub reader_options: ReaderOptions,
162    pub explorer_strategy: ExplorerStrategy,
163}
164
165impl NetdataFunctionConfig {
166    pub fn systemd_journal() -> Self {
167        Self {
168            function_name: DEFAULT_FUNCTION_NAME.to_string(),
169            default_facets: SYSTEMD_DEFAULT_FACETS
170                .iter()
171                .map(|field| (*field).to_string())
172                .collect(),
173            default_view_keys: SYSTEMD_DEFAULT_VIEW_KEYS
174                .iter()
175                .map(|field| (*field).to_string())
176                .collect(),
177            default_histogram: Some("PRIORITY".to_string()),
178            reader_options: ReaderOptions::snapshot(),
179            explorer_strategy: ExplorerStrategy::Traversal,
180        }
181    }
182}
183
184impl Default for NetdataFunctionConfig {
185    fn default() -> Self {
186        Self::systemd_journal()
187    }
188}
189
190#[derive(Debug, Default)]
191pub struct DisplayContext {
192    boot_first_realtime: BTreeMap<Vec<u8>, u64>,
193    uid_display_cache: RefCell<BTreeMap<String, String>>,
194    gid_display_cache: RefCell<BTreeMap<String, String>>,
195}
196
197#[derive(Debug, Clone, Copy)]
198pub enum DisplayScope {
199    Data,
200    Facet,
201    Histogram,
202}
203
204pub trait NetdataFunctionProfile {
205    fn field_display_value(
206        &self,
207        _context: &DisplayContext,
208        _scope: DisplayScope,
209        _field: &str,
210        value: &[u8],
211    ) -> Value {
212        Value::String(String::from_utf8_lossy(value).into_owned())
213    }
214
215    fn facet_option_name(&self, context: &DisplayContext, field: &str, raw_value: &[u8]) -> String {
216        match self.field_display_value(context, DisplayScope::Facet, field, raw_value) {
217            Value::String(value) => value,
218            other => other.to_string(),
219        }
220    }
221
222    fn row_options(&self, fields: &BTreeMap<String, Vec<Vec<u8>>>) -> Value {
223        if let Some(priority) = first_value(fields, "PRIORITY") {
224            return json!({ "severity": priority_to_row_severity(priority) });
225        }
226        json!({ "severity": "normal" })
227    }
228}
229
230#[derive(Debug, Clone, Copy, Default)]
231pub struct SystemdJournalProfile;
232
233#[derive(Debug, Clone, Copy, Default)]
234pub struct SystemdJournalPluginProfile;
235
236impl NetdataFunctionProfile for SystemdJournalProfile {
237    fn field_display_value(
238        &self,
239        context: &DisplayContext,
240        scope: DisplayScope,
241        field: &str,
242        value: &[u8],
243    ) -> Value {
244        systemd_field_display_value(context, scope, field, value, false)
245    }
246}
247
248impl NetdataFunctionProfile for SystemdJournalPluginProfile {
249    fn field_display_value(
250        &self,
251        context: &DisplayContext,
252        scope: DisplayScope,
253        field: &str,
254        value: &[u8],
255    ) -> Value {
256        systemd_field_display_value(context, scope, field, value, true)
257    }
258}
259
260#[derive(Debug, Clone)]
261pub struct NetdataJournalFunction<P = SystemdJournalProfile> {
262    config: NetdataFunctionConfig,
263    profile: P,
264}
265
266#[derive(Debug, Clone)]
267pub struct NetdataFunctionProgress {
268    pub current_file: usize,
269    pub total_files: usize,
270    pub matched_files: u64,
271    pub skipped_files: u64,
272    pub stats: ExplorerStats,
273    pub elapsed: Duration,
274}
275
276#[derive(Debug, Clone, Default, PartialEq, Eq)]
277pub struct NetdataJournalFileMetadata {
278    pub source_type: Option<u64>,
279    pub source_name: Option<String>,
280    pub file_last_modified_usec: Option<u64>,
281    pub msg_first_realtime_usec: Option<u64>,
282    pub msg_last_realtime_usec: Option<u64>,
283    pub journal_vs_realtime_delta_usec: Option<u64>,
284}
285
286pub trait NetdataFunctionState {
287    fn file_metadata(&self, _path: &Path) -> Option<NetdataJournalFileMetadata> {
288        None
289    }
290
291    fn update_file_journal_vs_realtime_delta_usec(&mut self, _path: &Path, _delta_usec: u64) {}
292}
293
294pub struct NetdataFunctionRunOptions<'a> {
295    pub timeout: Option<Duration>,
296    pub progress_callback: Option<&'a mut dyn FnMut(NetdataFunctionProgress)>,
297    pub cancellation_callback: Option<&'a dyn Fn() -> bool>,
298    pub state: Option<&'a mut dyn NetdataFunctionState>,
299    pub progress_interval: Duration,
300}
301
302impl NetdataFunctionRunOptions<'_> {
303    pub fn from_timeout_seconds(seconds: u64) -> Self {
304        let seconds = if seconds == 0 {
305            EFFECTIVELY_DISABLED_TIMEOUT_SECONDS
306        } else {
307            seconds
308        };
309        Self {
310            timeout: Some(Duration::from_secs(seconds)),
311            progress_callback: None,
312            cancellation_callback: None,
313            state: None,
314            progress_interval: Duration::from_millis(250),
315        }
316    }
317}
318
319impl Default for NetdataFunctionRunOptions<'_> {
320    fn default() -> Self {
321        Self {
322            timeout: Some(Duration::from_secs(EFFECTIVELY_DISABLED_TIMEOUT_SECONDS)),
323            progress_callback: None,
324            cancellation_callback: None,
325            state: None,
326            progress_interval: Duration::from_millis(250),
327        }
328    }
329}
330
331impl NetdataJournalFunction<SystemdJournalProfile> {
332    pub fn systemd_journal() -> Self {
333        Self {
334            config: NetdataFunctionConfig::systemd_journal(),
335            profile: SystemdJournalProfile,
336        }
337    }
338}
339
340impl NetdataJournalFunction<SystemdJournalPluginProfile> {
341    pub fn systemd_journal_plugin_compatible() -> Self {
342        Self {
343            config: NetdataFunctionConfig::systemd_journal(),
344            profile: SystemdJournalPluginProfile,
345        }
346    }
347}
348
349impl<P> NetdataJournalFunction<P>
350where
351    P: NetdataFunctionProfile,
352{
353    pub fn new(config: NetdataFunctionConfig, profile: P) -> Self {
354        Self { config, profile }
355    }
356
357    pub fn run_directory_request_json(&self, directory: &Path, request: &Value) -> Result<Value> {
358        self.run_directory_request_json_with_options(
359            directory,
360            request,
361            NetdataFunctionRunOptions::default(),
362        )
363    }
364
365    pub fn run_directory_request_json_with_options(
366        &self,
367        directory: &Path,
368        request: &Value,
369        mut options: NetdataFunctionRunOptions<'_>,
370    ) -> Result<Value> {
371        let request = NetdataRequest::parse(request, &self.config)?;
372        let collection = collect_journal_files(directory)?;
373        let paths = collection.files;
374        if request.info {
375            return Ok(self.info_response(request.echo, &paths, &options));
376        }
377        let annotation_paths = paths.clone();
378
379        let selected =
380            select_journal_files_for_request(paths, &request, self.config.reader_options, &options);
381        if let Some(response) = not_modified_before_scan_response(&request, &selected) {
382            return Ok(response);
383        }
384        let selected_files = selected.files;
385        let deadline = options.timeout.map(|timeout| Instant::now() + timeout);
386        let mut combined = self.explore_files(&selected_files, &request, deadline, &mut options)?;
387        self.finalize_combined_result(
388            &request,
389            &selected_files,
390            deadline,
391            &mut options,
392            &mut combined,
393            collection.skipped,
394            collection.errors,
395        )?;
396        Ok(self.query_response(request, &annotation_paths, combined))
397    }
398
399    fn finalize_combined_result(
400        &self,
401        request: &NetdataRequest,
402        selected_files: &[SelectedJournalFile],
403        deadline: Option<Instant>,
404        options: &mut NetdataFunctionRunOptions<'_>,
405        combined: &mut CombinedResult,
406        skipped_files: u64,
407        file_errors: Vec<String>,
408    ) -> Result<()> {
409        combined.skipped_files = combined.skipped_files.saturating_add(skipped_files);
410        combined.file_errors.extend(file_errors);
411        if combined.cancelled {
412            return Ok(());
413        }
414        if !request.data_only {
415            combined.add_zero_count_facet_values_from_files(
416                &request.facets,
417                self.config.reader_options,
418            );
419            combined.add_zero_count_selected_filter_values(request);
420        }
421        if should_collect_unfiltered_facet_vocabulary(request, combined) {
422            let vocabulary = self.explore_files(
423                selected_files,
424                &request.unfiltered_vocabulary(),
425                deadline,
426                options,
427            )?;
428            combined.add_zero_count_facet_values(&vocabulary.facets);
429        }
430        Ok(())
431    }
432
433    pub fn run_directory_request_bytes(&self, directory: &Path, request: &[u8]) -> Result<Value> {
434        self.run_directory_request_bytes_with_options(
435            directory,
436            request,
437            NetdataFunctionRunOptions::default(),
438        )
439    }
440
441    pub fn run_directory_request_bytes_with_options(
442        &self,
443        directory: &Path,
444        request: &[u8],
445        options: NetdataFunctionRunOptions<'_>,
446    ) -> Result<Value> {
447        let request: Value = serde_json::from_slice(request).map_err(|err| {
448            SdkError::InvalidPath(format!("invalid Netdata function JSON: {err}"))
449        })?;
450        self.run_directory_request_json_with_options(directory, &request, options)
451    }
452
453    fn explore_files(
454        &self,
455        files: &[SelectedJournalFile],
456        request: &NetdataRequest,
457        deadline: Option<Instant>,
458        options: &mut NetdataFunctionRunOptions<'_>,
459    ) -> Result<CombinedResult> {
460        let query = request.to_explorer_query(
461            files.len() as u64,
462            None,
463            NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
464        );
465        let mut combined = CombinedResult::default();
466        let page_window = RefCell::new(NetdataPageWindow::for_request(request));
467        combined.sampling_enabled = query.sampling.is_some();
468        let mut sampling_state =
469            ExplorerSamplingState::for_query(&query, histogram_bucket_count_for_query(&query));
470        let realtime_adjuster = RefCell::new(NetdataRealtimeAdjuster::new(request.direction));
471        let started = Instant::now();
472        let total_files = files.len();
473        for (file_index, file) in files.iter().enumerate() {
474            let path = &file.path;
475            if should_stop_before_file(&mut combined, deadline, options) {
476                break;
477            }
478            let Some(mut reader) = self.open_file_for_explore(
479                path,
480                &mut combined,
481                options,
482                progress_context(file_index, total_files, started),
483            )?
484            else {
485                continue;
486            };
487            combined.matched_files = combined.matched_files.saturating_add(1);
488            combined.matched_paths.push(path.clone());
489            let query = request.file_query(files.len(), reader.header(), &file.order);
490            collect_column_fields_for_file(&mut reader, request, path, &mut combined);
491            let explored = self.explore_single_file(
492                &mut reader,
493                request,
494                &query,
495                deadline,
496                options,
497                &combined,
498                &page_window,
499                sampling_state.as_mut(),
500                &realtime_adjuster,
501                progress_context(file_index, total_files, started),
502                file.order.journal_vs_realtime_delta_usec,
503            );
504            let Some((result, stop_reason)) = record_explore_result(explored, path, &mut combined)
505            else {
506                continue;
507            };
508            if finish_explored_file(
509                options,
510                request,
511                file,
512                &query,
513                result,
514                stop_reason,
515                &mut combined,
516                files,
517                file_index,
518                progress_context(file_index, total_files, started),
519            )? {
520                break;
521            }
522        }
523        combined.expand_row_payloads(self.config.reader_options);
524        combined.page_counters = Some(page_window.into_inner().counters());
525        Ok(combined)
526    }
527
528    fn open_file_for_explore(
529        &self,
530        path: &Path,
531        combined: &mut CombinedResult,
532        options: &mut NetdataFunctionRunOptions<'_>,
533        progress: ProgressContext,
534    ) -> Result<Option<FileReader>> {
535        match FileReader::open_with_options(path, self.config.reader_options) {
536            Ok(reader) => Ok(Some(reader)),
537            Err(err) => {
538                combined.skipped_files = combined.skipped_files.saturating_add(1);
539                combined
540                    .file_errors
541                    .push(format!("{}: {err}", path.display()));
542                emit_progress_for_combined(options, combined, progress);
543                Ok(None)
544            }
545        }
546    }
547
548    #[allow(clippy::too_many_arguments)]
549    fn explore_single_file(
550        &self,
551        reader: &mut FileReader,
552        request: &NetdataRequest,
553        query: &ExplorerQuery,
554        deadline: Option<Instant>,
555        options: &mut NetdataFunctionRunOptions<'_>,
556        combined: &CombinedResult,
557        page_window: &RefCell<NetdataPageWindow>,
558        sampling_state: Option<&mut ExplorerSamplingState>,
559        realtime_adjuster: &RefCell<NetdataRealtimeAdjuster>,
560        progress: ProgressContext,
561        realtime_delta_usec: u64,
562    ) -> Result<(ExplorerResult, Option<ExplorerStopReason>)> {
563        let cancellation_callback = options.cancellation_callback;
564        let progress_interval = options.progress_interval;
565        let mut explorer_progress = |explorer_progress: ExplorerProgress| {
566            emit_explorer_progress(options, combined, explorer_progress, progress);
567        };
568        let mut control = ExplorerControl::new();
569        control.set_deadline(deadline);
570        control.set_cancellation_callback(cancellation_callback);
571        control.set_progress_interval(progress_interval);
572        control.set_progress_callback(Some(&mut explorer_progress));
573        control.set_sampling_state(sampling_state);
574        let mut candidate_row =
575            |realtime_usec| page_window.borrow().candidate_to_keep(realtime_usec);
576        control.set_candidate_row_callback(Some(&mut candidate_row));
577        let mut adjust_realtime =
578            |realtime_usec| realtime_adjuster.borrow_mut().adjust(realtime_usec);
579        control.set_realtime_adjust_callback(Some(&mut adjust_realtime));
580        let mut matched_row = |realtime_usec, rows_matched| {
581            delta_scan_can_stop(
582                request,
583                page_window,
584                realtime_usec,
585                rows_matched,
586                realtime_delta_usec,
587            )
588        };
589        control.set_matched_row_callback(Some(&mut matched_row));
590        let result = reader.explore_with_strategy_cursor_rows_controlled(
591            query,
592            self.config.explorer_strategy,
593            &mut control,
594        )?;
595        Ok((result, control.stop_reason()))
596    }
597
598    fn info_response(
599        &self,
600        echo: Value,
601        paths: &[PathBuf],
602        options: &NetdataFunctionRunOptions<'_>,
603    ) -> Value {
604        json!({
605            "_request": echo,
606            "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
607            "v": 3,
608            "accepted_params": self.accepted_params_from_fields(&[]),
609            "required_params": self.required_source_params(paths, options),
610            "show_ids": true,
611            "has_history": true,
612            "pagination": {
613                "enabled": true,
614                "key": "anchor",
615                "column": "timestamp",
616                "units": "timestamp_usec",
617            },
618            "status": 200,
619            "type": "table",
620            "help": "Netdata-compatible journal log function backed by the systemd journal SDK"
621        })
622    }
623
624    fn query_response(
625        &self,
626        request: NetdataRequest,
627        annotation_paths: &[PathBuf],
628        combined: CombinedResult,
629    ) -> Value {
630        // Match Netdata systemd-journal.plugin: if files are newer but no
631        // useful rows survive the query, the function returns 304.
632        let not_modified = request.if_modified_since_usec != 0
633            && !combined.partial
634            && combined.stats.rows_matched == 0;
635        if combined.cancelled {
636            return netdata_function_error(499, "Request cancelled.");
637        }
638        if not_modified {
639            return netdata_function_error(304, "No new data since the previous call.");
640        }
641        let artifacts = self.query_response_artifacts(&request, annotation_paths, &combined);
642        let mut response = base_query_response(&request, &combined, &artifacts);
643        let Some(object) = response.as_object_mut() else {
644            return netdata_function_error(500, "Internal Netdata function response error.");
645        };
646        self.add_query_response_metadata(object, &request, &combined, artifacts);
647        response
648    }
649
650    fn query_response_artifacts(
651        &self,
652        request: &NetdataRequest,
653        annotation_paths: &[PathBuf],
654        combined: &CombinedResult,
655    ) -> QueryResponseArtifacts {
656        let reportable_facet_fields = combined.reportable_facet_fields_bytes(&request.facets);
657        let reportable_facet_field_names = string_fields(&reportable_facet_fields);
658        let columns = self.build_columns(
659            &request,
660            &reportable_facet_fields,
661            &combined.rows,
662            &combined.facets,
663            &combined.column_fields,
664        );
665        let boot_ids = response_boot_ids(
666            &columns.order,
667            &combined.rows,
668            &combined.facets,
669            combined.histogram.as_ref(),
670        );
671        let context = DisplayContext {
672            boot_first_realtime: collect_boot_first_realtime(
673                annotation_paths,
674                self.config.reader_options,
675                &boot_ids,
676            ),
677            ..DisplayContext::default()
678        };
679        let data =
680            self.build_data_rows(&context, &columns.order, &combined.rows, request.direction);
681        let facets = self.build_facets(&context, &reportable_facet_fields, &combined.facets);
682        let histogram = combined.histogram.as_ref().map(|histogram| {
683            self.build_histogram(&context, histogram, combined.facets.get(&histogram.field))
684        });
685        let message = query_message(combined.timed_out, &combined.stats);
686        let items = response_items(request, combined, data.len() as u64);
687        QueryResponseArtifacts {
688            reportable_facet_field_names,
689            columns,
690            data,
691            facets,
692            histogram,
693            message,
694            items,
695        }
696    }
697
698    fn add_query_response_metadata(
699        &self,
700        object: &mut Map<String, Value>,
701        request: &NetdataRequest,
702        combined: &CombinedResult,
703        artifacts: QueryResponseArtifacts,
704    ) {
705        if !request.data_only {
706            self.add_full_query_response_metadata(object, request, combined, &artifacts);
707        } else if request.histogram.is_some() {
708            object.insert(
709                "available_histograms".to_string(),
710                self.available_histograms(request, combined),
711            );
712        }
713        add_last_modified_if_needed(object, request, combined);
714        add_sampling_if_needed(object, combined);
715        add_analysis_outputs_if_needed(object, request, artifacts);
716    }
717
718    fn add_full_query_response_metadata(
719        &self,
720        object: &mut Map<String, Value>,
721        request: &NetdataRequest,
722        combined: &CombinedResult,
723        artifacts: &QueryResponseArtifacts,
724    ) {
725        object.insert("message".to_string(), artifacts.message.clone());
726        object.insert("update_every".to_string(), Value::from(1));
727        object.insert("help".to_string(), Value::Null);
728        object.insert(
729            "accepted_params".to_string(),
730            self.accepted_params_from_fields(&artifacts.reportable_facet_field_names),
731        );
732        object.insert("default_sort_column".to_string(), Value::from("timestamp"));
733        object.insert("default_charts".to_string(), Value::Array(Vec::new()));
734        object.insert(
735            "available_histograms".to_string(),
736            self.available_histograms(request, combined),
737        );
738    }
739
740    fn build_columns(
741        &self,
742        request: &NetdataRequest,
743        reportable_facet_fields: &[Vec<u8>],
744        rows: &[LocatedRow],
745        facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
746        column_fields: &BTreeSet<String>,
747    ) -> Columns {
748        let mut order = vec!["timestamp".to_string(), "rowOptions".to_string()];
749        push_unique_many(&mut order, &self.config.default_view_keys);
750        push_unique_many(&mut order, &string_fields(reportable_facet_fields));
751        if let Some(histogram) = &request.histogram {
752            push_unique(&mut order, histogram);
753        }
754        for field in column_fields {
755            push_unique(&mut order, field);
756        }
757
758        for (field, values) in facets {
759            if !facet_group_is_reportable(values) {
760                continue;
761            }
762            push_unique(&mut order, &String::from_utf8_lossy(field));
763        }
764        for row in rows {
765            let fields = row_fields(row);
766            for field in fields.keys() {
767                push_unique(&mut order, field);
768            }
769        }
770
771        let mut map = Map::new();
772        for (index, key) in order.iter().enumerate() {
773            map.insert(key.clone(), column_metadata(key, index));
774        }
775        Columns { order, map }
776    }
777
778    fn build_data_rows(
779        &self,
780        context: &DisplayContext,
781        column_order: &[String],
782        rows: &[LocatedRow],
783        direction: Direction,
784    ) -> Vec<Value> {
785        let row_iter: Box<dyn Iterator<Item = &LocatedRow> + '_> = match direction {
786            Direction::Forward => Box::new(rows.iter().rev()),
787            Direction::Backward => Box::new(rows.iter()),
788        };
789        row_iter
790            .map(|located| {
791                let fields = row_fields(located);
792                let mut row = Vec::with_capacity(column_order.len());
793                for column in column_order {
794                    let value = match column.as_str() {
795                        "timestamp" => Value::from(located.row.realtime_usec),
796                        "rowOptions" => self.profile.row_options(&fields),
797                        field => first_value(&fields, field)
798                            .map(|value| {
799                                self.profile.field_display_value(
800                                    context,
801                                    DisplayScope::Data,
802                                    field,
803                                    value,
804                                )
805                            })
806                            .unwrap_or(Value::Null),
807                    };
808                    row.push(value);
809                }
810                Value::Array(row)
811            })
812            .collect()
813    }
814
815    fn build_facets(
816        &self,
817        context: &DisplayContext,
818        requested: &[Vec<u8>],
819        facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
820    ) -> Value {
821        let mut out = Vec::new();
822        for (order, field) in requested.iter().enumerate() {
823            let values = facets.get(field);
824            let field_name = String::from_utf8_lossy(field).into_owned();
825            let mut options: Vec<_> = values
826                .into_iter()
827                .flat_map(|values| values.iter())
828                .filter(|(value, count)| {
829                    (!value.is_empty() && value.as_slice() != b"-")
830                        || (**count == 0 && value.is_empty())
831                })
832                .map(|(value, count)| {
833                    if *count == 0 && value.is_empty() {
834                        return json!({
835                            "id": NETDATA_EMPTY_STRING_FACET_HASH_ID,
836                            "name": NETDATA_UNAVAILABLE_FIELD_LABEL,
837                            "count": count,
838                        });
839                    }
840                    json!({
841                        "id": String::from_utf8_lossy(value).into_owned(),
842                        "name": self.profile.facet_option_name(context, &field_name, value),
843                        "count": count,
844                    })
845                })
846                .collect();
847            sort_facet_options(&field_name, &mut options);
848            for (idx, option) in options.iter_mut().enumerate() {
849                if let Some(object) = option.as_object_mut() {
850                    object.insert("order".to_string(), Value::from((idx + 1) as u64));
851                }
852            }
853            out.push(json!({
854                "id": field_name,
855                "name": String::from_utf8_lossy(field).into_owned(),
856                "order": order + 1,
857                "options": options,
858            }));
859        }
860        Value::Array(out)
861    }
862
863    fn build_histogram(
864        &self,
865        context: &DisplayContext,
866        histogram: &ExplorerHistogram,
867        known_values: Option<&BTreeMap<Vec<u8>, u64>>,
868    ) -> Value {
869        let field = String::from_utf8_lossy(&histogram.field).into_owned();
870        let mut dimension_ids = BTreeSet::new();
871        let mut buckets = Vec::with_capacity(histogram.buckets.len());
872        for bucket in &histogram.buckets {
873            let mut values = BTreeMap::new();
874            for (value, count) in &bucket.values {
875                add_netdata_facet_count(&mut values, value, *count);
876            }
877            for value in values.keys() {
878                dimension_ids.insert(value.clone());
879            }
880            buckets.push((bucket.start_realtime_usec, values));
881        }
882        let actual_dimension_ids = dimension_ids.clone();
883        if let Some(known_values) = known_values {
884            for value in known_values.keys() {
885                if value.is_empty() || value.as_slice() == b"-" {
886                    continue;
887                }
888                dimension_ids.insert(value.clone());
889            }
890        }
891        let dimension_ids: Vec<Vec<u8>> = dimension_ids.into_iter().collect();
892        let metadata = self.histogram_chart_metadata(
893            context,
894            &field,
895            &buckets,
896            &actual_dimension_ids,
897            &dimension_ids,
898        );
899        let data: Vec<Value> = buckets
900            .iter()
901            .map(|(start_realtime_usec, values)| {
902                let mut point = Vec::with_capacity(dimension_ids.len() + 1);
903                point.push(Value::from(start_realtime_usec / 1000));
904                for value in &dimension_ids {
905                    let count = values
906                        .get(value)
907                        .copied()
908                        .map(Value::from)
909                        .unwrap_or_else(|| {
910                            if actual_dimension_ids.contains(value) {
911                                Value::from(0)
912                            } else {
913                                Value::Null
914                            }
915                        });
916                    point.push(Value::Array(vec![count, Value::from(0), Value::from(0)]));
917                }
918                Value::Array(point)
919            })
920            .collect();
921
922        json!({
923            "id": field,
924            "name": field,
925            "chart": {
926                "summary": metadata.summary,
927                "totals": metadata.totals,
928                "result": {
929                    "labels": metadata.result_labels,
930                    "point": { "value": 0, "arp": 1, "pa": 2 },
931                    "data": data,
932                },
933                "db": {
934                    "tiers": 1,
935                    "update_every": histogram_update_every_seconds(histogram),
936                    "units": "events",
937                    "dimensions": {
938                        "ids": metadata.ids.clone(),
939                        "names": metadata.names.clone(),
940                        "units": metadata.units.clone(),
941                        "sts": metadata.stats.clone(),
942                    },
943                    "per_tier": [{
944                        "tier": 0,
945                        "queries": 1,
946                        "points": metadata.points,
947                        "update_every": histogram_update_every_seconds(histogram),
948                    }],
949                },
950                "view": {
951                    "title": format!("Events Distribution by {}", String::from_utf8_lossy(&histogram.field)),
952                    "update_every": histogram_update_every_seconds(histogram),
953                    "after": histogram_after_seconds(histogram),
954                    "before": histogram_before_seconds(histogram),
955                    "units": "events",
956                    "chart_type": "stackedBar",
957                    "dimensions": {
958                        "grouped_by": ["dimension"],
959                        "ids": metadata.ids,
960                        "names": metadata.names,
961                        "colors": metadata.colors,
962                        "units": metadata.units,
963                        "sts": metadata.stats,
964                    },
965                    "min": metadata.min,
966                    "max": metadata.max,
967                },
968                "agents": [{
969                    "mg": "default",
970                    "nm": "facets.histogram",
971                    "now": now_unix_seconds(),
972                    "ai": 0,
973                }],
974            }
975        })
976    }
977
978    fn histogram_chart_metadata(
979        &self,
980        context: &DisplayContext,
981        field: &str,
982        buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
983        actual_dimensions: &BTreeSet<Vec<u8>>,
984        dimensions: &[Vec<u8>],
985    ) -> HistogramChartMetadata {
986        let mut ids = Vec::with_capacity(dimensions.len());
987        let mut names = Vec::with_capacity(dimensions.len());
988        let mut colors = Vec::with_capacity(dimensions.len());
989        let mut units = Vec::with_capacity(dimensions.len());
990        let mut min_values = Vec::with_capacity(dimensions.len());
991        let mut max_values = Vec::with_capacity(dimensions.len());
992        let mut avg_values = Vec::with_capacity(dimensions.len());
993        let mut arp_values = Vec::with_capacity(dimensions.len());
994        let mut con_values = Vec::with_capacity(dimensions.len());
995        let mut summary_dimensions = Vec::with_capacity(dimensions.len());
996        let mut result_labels = vec![Value::String("time".to_string())];
997
998        let total: u64 = dimensions
999            .iter()
1000            .map(|dimension| {
1001                buckets
1002                    .iter()
1003                    .map(|(_, values)| values.get(dimension).copied().unwrap_or(0))
1004                    .sum::<u64>()
1005            })
1006            .sum();
1007
1008        let mut min = 0;
1009        let mut max = 0;
1010        let mut points = 0;
1011        for (priority, dimension) in dimensions.iter().enumerate() {
1012            let id = String::from_utf8_lossy(dimension).into_owned();
1013            let display = match self.profile.field_display_value(
1014                context,
1015                DisplayScope::Histogram,
1016                field,
1017                dimension,
1018            ) {
1019                Value::String(value) => value,
1020                other => other.to_string(),
1021            };
1022            let (dimension_min, dimension_max, dimension_sum, actual) =
1023                histogram_dimension_stats(buckets, actual_dimensions, dimension);
1024            let dimension_average = if actual && !buckets.is_empty() {
1025                dimension_sum as f64 / buckets.len() as f64
1026            } else {
1027                0.0
1028            };
1029            if actual {
1030                if points == 0 || dimension_min < min {
1031                    min = dimension_min;
1032                }
1033                if dimension_max > max {
1034                    max = dimension_max;
1035                }
1036                points += buckets.len() as u64;
1037            }
1038            let contribution = if total > 0 {
1039                dimension_sum as f64 * 100.0 / total as f64
1040            } else {
1041                0.0
1042            };
1043
1044            ids.push(Value::String(id.clone()));
1045            names.push(Value::String(display.clone()));
1046            colors.push(Value::Null);
1047            units.push(Value::String("events".to_string()));
1048            result_labels.push(Value::String(display.clone()));
1049            min_values.push(Value::from(dimension_min));
1050            max_values.push(Value::from(dimension_max));
1051            avg_values.push(Value::from(dimension_average));
1052            arp_values.push(Value::from(0));
1053            con_values.push(Value::from(contribution));
1054            summary_dimensions.push(json!({
1055                "id": id,
1056                "nm": display,
1057                "ds": { "sl": bool_to_u64(actual), "qr": bool_to_u64(actual) },
1058                "sts": {
1059                    "min": dimension_min,
1060                    "max": dimension_max,
1061                    "avg": dimension_average,
1062                    "con": contribution,
1063                },
1064                "pri": priority as u64,
1065            }));
1066        }
1067
1068        let stats = json!({
1069            "min": min_values,
1070            "max": max_values,
1071            "avg": avg_values,
1072            "arp": arp_values,
1073            "con": con_values,
1074        });
1075        let summary_stats = json!({
1076            "min": min,
1077            "max": max,
1078            "avg": histogram_average(total, points),
1079            "con": 100.0,
1080        });
1081        let dimensions_len = dimensions.len() as u64;
1082
1083        HistogramChartMetadata {
1084            ids,
1085            names,
1086            colors,
1087            units,
1088            stats,
1089            summary: json!({
1090                "nodes": [histogram_summary_node(dimensions_len, points, &summary_stats)],
1091                "contexts": [histogram_summary_context(dimensions_len, points, &summary_stats)],
1092                "instances": [histogram_summary_instance(dimensions_len, points, &summary_stats)],
1093                "dimensions": summary_dimensions,
1094                "labels": [],
1095                "alerts": [],
1096            }),
1097            totals: histogram_totals(dimensions_len),
1098            result_labels,
1099            min,
1100            max,
1101            points,
1102        }
1103    }
1104
1105    fn accepted_params_from_fields(&self, fields: &[String]) -> Value {
1106        NETDATA_ACCEPTED_PARAMS
1107            .iter()
1108            .copied()
1109            .chain(fields.iter().map(String::as_str))
1110            .map(|field| Value::String(field.to_string()))
1111            .collect()
1112    }
1113
1114    fn required_source_params(
1115        &self,
1116        paths: &[PathBuf],
1117        options: &NetdataFunctionRunOptions<'_>,
1118    ) -> Value {
1119        let mut all = JournalSourceSummary::default();
1120        let mut local = JournalSourceSummary::default();
1121        let mut local_namespaces = JournalSourceSummary::default();
1122        let mut local_system = JournalSourceSummary::default();
1123        let mut local_user = JournalSourceSummary::default();
1124        let mut remote = JournalSourceSummary::default();
1125        let mut other = JournalSourceSummary::default();
1126        let mut exact = BTreeMap::<String, JournalSourceSummary>::new();
1127
1128        for path in paths {
1129            let metadata = file_metadata(options, path);
1130            let source_type = metadata
1131                .as_ref()
1132                .and_then(|metadata| metadata.source_type)
1133                .unwrap_or_else(|| journal_file_source_type(path));
1134            all.add_path(path, self.config.reader_options, metadata.as_ref());
1135            if source_type & SOURCE_TYPE_LOCAL_ALL != 0 {
1136                local.add_path(path, self.config.reader_options, metadata.as_ref());
1137            }
1138            if source_type & SOURCE_TYPE_LOCAL_NAMESPACE != 0 {
1139                local_namespaces.add_path(path, self.config.reader_options, metadata.as_ref());
1140            }
1141            if source_type & SOURCE_TYPE_LOCAL_SYSTEM != 0 {
1142                local_system.add_path(path, self.config.reader_options, metadata.as_ref());
1143            }
1144            if source_type & SOURCE_TYPE_LOCAL_USER != 0 {
1145                local_user.add_path(path, self.config.reader_options, metadata.as_ref());
1146            }
1147            if source_type & SOURCE_TYPE_REMOTE_ALL != 0 {
1148                remote.add_path(path, self.config.reader_options, metadata.as_ref());
1149            }
1150            if source_type & SOURCE_TYPE_LOCAL_OTHER != 0 {
1151                other.add_path(path, self.config.reader_options, metadata.as_ref());
1152            }
1153            let source_name = metadata
1154                .as_ref()
1155                .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1156                .or_else(|| journal_file_exact_source_name(path));
1157            if let Some(source_name) = source_name {
1158                exact.entry(source_name).or_default().add_path(
1159                    path,
1160                    self.config.reader_options,
1161                    metadata.as_ref(),
1162                );
1163            }
1164        }
1165
1166        let mut source_options = Vec::new();
1167        push_source_option(&mut source_options, "all", &all);
1168        push_source_option(&mut source_options, "all-local-logs", &local);
1169        push_source_option(
1170            &mut source_options,
1171            "all-local-namespaces",
1172            &local_namespaces,
1173        );
1174        push_source_option(&mut source_options, "all-local-system-logs", &local_system);
1175        push_source_option(&mut source_options, "all-local-user-logs", &local_user);
1176        push_source_option(&mut source_options, "all-remote-systems", &remote);
1177        push_source_option(&mut source_options, "all-uncategorized", &other);
1178        for (name, summary) in exact {
1179            push_source_option(&mut source_options, &name, &summary);
1180        }
1181
1182        json!([{
1183            "id": "__logs_sources",
1184            "name": "Journal Sources",
1185            "help": "Select the logs source to query",
1186            "type": "multiselect",
1187            "options": source_options,
1188        }])
1189    }
1190
1191    fn available_histograms(&self, request: &NetdataRequest, combined: &CombinedResult) -> Value {
1192        let mut fields = combined.reportable_facet_fields(&request.facets);
1193        if request.data_only {
1194            if let Some(histogram) = &request.histogram {
1195                push_unique(&mut fields, histogram);
1196            }
1197        }
1198        let mut sorted = fields.clone();
1199        sorted.sort_by(|left, right| netdata_reorder_key(left).cmp(&netdata_reorder_key(right)));
1200        let order_by_field: BTreeMap<String, usize> = sorted
1201            .into_iter()
1202            .enumerate()
1203            .map(|(index, field)| (field, index + 1))
1204            .collect();
1205
1206        fields
1207            .into_iter()
1208            .map(|field| {
1209                let order = order_by_field.get(&field).copied().unwrap_or(0);
1210                json!({
1211                    "id": field,
1212                    "name": field,
1213                    "order": order,
1214                })
1215            })
1216            .collect()
1217    }
1218}
1219
1220struct HistogramChartMetadata {
1221    ids: Vec<Value>,
1222    names: Vec<Value>,
1223    colors: Vec<Value>,
1224    units: Vec<Value>,
1225    stats: Value,
1226    summary: Value,
1227    totals: Value,
1228    result_labels: Vec<Value>,
1229    min: u64,
1230    max: u64,
1231    points: u64,
1232}
1233
1234fn histogram_dimension_stats(
1235    buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1236    actual_dimensions: &BTreeSet<Vec<u8>>,
1237    dimension: &[u8],
1238) -> (u64, u64, u64, bool) {
1239    if !actual_dimensions.contains(dimension) {
1240        return (0, 0, 0, false);
1241    }
1242    let mut min = 0;
1243    let mut max = 0;
1244    let mut sum = 0;
1245    for (idx, (_, values)) in buckets.iter().enumerate() {
1246        let count = values.get(dimension).copied().unwrap_or(0);
1247        if idx == 0 || count < min {
1248            min = count;
1249        }
1250        if count > max {
1251            max = count;
1252        }
1253        sum += count;
1254    }
1255    (min, max, sum, true)
1256}
1257
1258fn histogram_average(total: u64, points: u64) -> f64 {
1259    if points == 0 {
1260        0.0
1261    } else {
1262        total as f64 / points as f64
1263    }
1264}
1265
1266fn histogram_summary_node(dimensions: u64, points: u64, stats: &Value) -> Value {
1267    let mut node = json!({
1268        "mg": "default",
1269        "nm": "facets.histogram",
1270        "ni": 0,
1271        "st": { "ai": 0, "code": 200, "msg": "" },
1272    });
1273    add_histogram_summary_shape(&mut node, dimensions, points, stats, true);
1274    node
1275}
1276
1277fn histogram_summary_context(dimensions: u64, points: u64, stats: &Value) -> Value {
1278    let mut context = json!({ "id": "facets.histogram" });
1279    add_histogram_summary_shape(&mut context, dimensions, points, stats, true);
1280    context
1281}
1282
1283fn histogram_summary_instance(dimensions: u64, points: u64, stats: &Value) -> Value {
1284    let mut instance = json!({ "id": "facets.histogram", "ni": 0 });
1285    add_histogram_summary_shape(&mut instance, dimensions, points, stats, false);
1286    instance
1287}
1288
1289fn add_histogram_summary_shape(
1290    object: &mut Value,
1291    dimensions: u64,
1292    points: u64,
1293    stats: &Value,
1294    include_instances: bool,
1295) {
1296    let Some(map) = object.as_object_mut() else {
1297        return;
1298    };
1299    if dimensions > 0 {
1300        if include_instances {
1301            map.insert("is".to_string(), json!({ "sl": 1, "qr": 1 }));
1302        }
1303        map.insert(
1304            "ds".to_string(),
1305            json!({ "sl": dimensions, "qr": dimensions }),
1306        );
1307    }
1308    if points > 0 {
1309        map.insert("sts".to_string(), stats.clone());
1310    }
1311}
1312
1313fn histogram_totals(dimensions: u64) -> Value {
1314    let mut totals = json!({ "nodes": { "sl": 1, "qr": 1 } });
1315    if dimensions > 0 {
1316        if let Some(map) = totals.as_object_mut() {
1317            map.insert("contexts".to_string(), json!({ "sl": 1, "qr": 1 }));
1318            map.insert("instances".to_string(), json!({ "sl": 1, "qr": 1 }));
1319            map.insert(
1320                "dimensions".to_string(),
1321                json!({ "sl": dimensions, "qr": dimensions }),
1322            );
1323        }
1324    }
1325    totals
1326}
1327
1328fn bool_to_u64(value: bool) -> u64 {
1329    if value { 1 } else { 0 }
1330}
1331
1332fn histogram_after_seconds(histogram: &ExplorerHistogram) -> u64 {
1333    histogram
1334        .buckets
1335        .first()
1336        .map(|bucket| bucket.start_realtime_usec / 1_000_000)
1337        .unwrap_or(0)
1338}
1339
1340fn histogram_before_seconds(histogram: &ExplorerHistogram) -> u64 {
1341    histogram
1342        .buckets
1343        .last()
1344        .map(|bucket| bucket.end_realtime_usec / 1_000_000)
1345        .unwrap_or(0)
1346}
1347
1348fn now_unix_seconds() -> u64 {
1349    SystemTime::now()
1350        .duration_since(UNIX_EPOCH)
1351        .unwrap_or_default()
1352        .as_secs()
1353}
1354
1355#[derive(Debug, Clone)]
1356struct NetdataRequest {
1357    info: bool,
1358    echo: Value,
1359    after_realtime_usec: Option<u64>,
1360    before_realtime_usec: Option<u64>,
1361    if_modified_since_usec: u64,
1362    anchor: ExplorerAnchor,
1363    direction: Direction,
1364    limit: usize,
1365    data_only: bool,
1366    delta: bool,
1367    tail: bool,
1368    sampling: u64,
1369    source_type: u64,
1370    exact_sources: Vec<String>,
1371    filters: Vec<ExplorerFilter>,
1372    facets: Vec<Vec<u8>>,
1373    histogram: Option<String>,
1374    fts_terms: Vec<ExplorerFtsPattern>,
1375    fts_patterns: Vec<Vec<u8>>,
1376    fts_negative_patterns: Vec<Vec<u8>>,
1377}
1378
1379impl NetdataRequest {
1380    fn parse(value: &Value, config: &NetdataFunctionConfig) -> Result<Self> {
1381        let object = value.as_object().ok_or_else(|| {
1382            SdkError::InvalidPath("Netdata function request must be a JSON object".to_string())
1383        })?;
1384        let now_seconds = unix_now_seconds();
1385        let info = get_bool(object, "info").unwrap_or(false);
1386        let after = get_i64(object, "after");
1387        let before = get_i64(object, "before");
1388        let (after_realtime_usec, before_realtime_usec) =
1389            normalize_time_window(now_seconds, after, before);
1390        let direction = request_direction(object);
1391        let if_modified_since_usec = get_u64(object, "if_modified_since").unwrap_or_default();
1392        let data_only = get_bool(object, "data_only").unwrap_or(false);
1393        let delta = request_delta(data_only, object);
1394        let tail = request_tail(data_only, if_modified_since_usec, object);
1395        let sampling = get_u64(object, "sampling").unwrap_or(DEFAULT_ITEMS_SAMPLING);
1396        let (anchor, direction) = request_anchor_and_direction(
1397            object,
1398            tail,
1399            direction,
1400            after_realtime_usec,
1401            before_realtime_usec,
1402        );
1403        let requested_limit = request_limit(object);
1404        let limit = requested_limit.max(2);
1405        let requested_facets = parse_string_array(object.get("facets"));
1406        let facets = request_facets(&requested_facets, config);
1407        let requested_histogram = request_histogram(object);
1408        let histogram = request_histogram_or_default(&requested_histogram, config);
1409        let requested_query = request_query(object);
1410        let (fts_terms, fts_patterns, fts_negative_patterns) = requested_query
1411            .as_deref()
1412            .map(parse_fts_query_patterns)
1413            .unwrap_or_default();
1414        let source_selection = parse_source_selection(object.get("selections"));
1415        let filters = parse_filters(object.get("selections"));
1416
1417        let echo_input = RequestEchoInput {
1418            info,
1419            after_realtime_usec,
1420            before_realtime_usec,
1421            if_modified_since_usec,
1422            anchor,
1423            direction,
1424            limit: requested_limit,
1425            data_only,
1426            delta,
1427            tail,
1428            sampling,
1429            source_type: source_selection.source_type,
1430            requested_facets: requested_facets.as_deref(),
1431            selections: object.get("selections"),
1432            histogram: requested_histogram.as_deref(),
1433            query: requested_query.as_deref(),
1434        };
1435        let echo = normalized_request_echo(&echo_input);
1436
1437        Ok(Self {
1438            info,
1439            echo,
1440            after_realtime_usec,
1441            before_realtime_usec,
1442            if_modified_since_usec,
1443            anchor,
1444            direction,
1445            limit,
1446            data_only,
1447            delta,
1448            tail,
1449            sampling,
1450            source_type: source_selection.source_type,
1451            exact_sources: source_selection.exact_sources,
1452            filters,
1453            facets,
1454            histogram,
1455            fts_terms,
1456            fts_patterns,
1457            fts_negative_patterns,
1458        })
1459    }
1460
1461    fn matches_source(&self, path: &Path, metadata: Option<&NetdataJournalFileMetadata>) -> bool {
1462        if self.source_type == SOURCE_TYPE_ALL && self.exact_sources.is_empty() {
1463            return true;
1464        }
1465        if self.source_type & SOURCE_TYPE_ALL != 0 {
1466            return true;
1467        }
1468        let file_source_type = metadata
1469            .and_then(|metadata| metadata.source_type)
1470            .unwrap_or_else(|| journal_file_source_type(path));
1471        if file_source_type & self.source_type != 0 {
1472            return true;
1473        }
1474        if self.exact_sources.is_empty() {
1475            return false;
1476        }
1477        let source_name = metadata
1478            .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1479            .or_else(|| journal_file_exact_source_name(path));
1480        self.exact_sources
1481            .iter()
1482            .any(|source| source_name.as_deref() == Some(source.as_str()))
1483    }
1484
1485    fn to_explorer_query(
1486        &self,
1487        matched_files: u64,
1488        file_header: Option<FileHeader>,
1489        realtime_slack_usec: u64,
1490    ) -> ExplorerQuery {
1491        let analysis_enabled = !self.data_only || self.delta;
1492        let tail_anchor = self.tail && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1493        let backward_page_anchor = self.data_only
1494            && !tail_anchor
1495            && self.direction == Direction::Backward
1496            && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1497        let after_realtime_usec = if tail_anchor {
1498            tail_after_realtime_bound(self.after_realtime_usec, self.anchor)
1499        } else {
1500            self.after_realtime_usec
1501        };
1502        let before_realtime_usec = if backward_page_anchor {
1503            before_realtime_bound_excluding_anchor(self.before_realtime_usec, self.anchor)
1504        } else {
1505            self.before_realtime_usec
1506        };
1507        let anchor = if tail_anchor || backward_page_anchor {
1508            ExplorerAnchor::Auto
1509        } else {
1510            self.anchor
1511        };
1512        let sampling = (analysis_enabled
1513            && self.sampling != 0
1514            && matched_files != 0
1515            && after_realtime_usec.is_some()
1516            && before_realtime_usec.is_some())
1517        .then(|| {
1518            let header = file_header.unwrap_or(FileHeader {
1519                signature: [0; 8],
1520                compatible_flags: 0,
1521                incompatible_flags: 0,
1522                state: 0,
1523                header_size: 0,
1524                n_entries: 0,
1525                head_entry_realtime: 0,
1526                tail_entry_realtime: 0,
1527                head_entry_seqnum: 0,
1528                tail_entry_seqnum: 0,
1529                tail_entry_boot_id: [0; 16],
1530                seqnum_id: [0; 16],
1531            });
1532            let messages_in_file = header
1533                .tail_entry_seqnum
1534                .checked_sub(header.head_entry_seqnum)
1535                .and_then(|span| span.checked_add(1))
1536                .filter(|_| header.head_entry_seqnum != 0 && header.tail_entry_seqnum != 0)
1537                .unwrap_or(header.n_entries);
1538            ExplorerSampling {
1539                budget: self.sampling,
1540                matched_files,
1541                file_head_realtime_usec: header.head_entry_realtime,
1542                file_tail_realtime_usec: header.tail_entry_realtime,
1543                file_head_seqnum: header.head_entry_seqnum,
1544                file_tail_seqnum: header.tail_entry_seqnum,
1545                file_entries: messages_in_file,
1546            }
1547        });
1548        ExplorerQuery {
1549            after_realtime_usec,
1550            before_realtime_usec,
1551            anchor,
1552            direction: self.direction,
1553            limit: self.limit,
1554            filters: self.filters.clone(),
1555            facets: analysis_enabled
1556                .then(|| self.facets.clone())
1557                .unwrap_or_default(),
1558            histogram: analysis_enabled
1559                .then(|| {
1560                    self.histogram
1561                        .as_ref()
1562                        .map(|field| field.as_bytes().to_vec())
1563                })
1564                .flatten(),
1565            histogram_target_buckets: DEFAULT_HISTOGRAM_BUCKETS,
1566            fts_terms: self.fts_terms.clone(),
1567            fts_patterns: self.fts_patterns.clone(),
1568            fts_negative_patterns: self.fts_negative_patterns.clone(),
1569            field_mode: ExplorerFieldMode::FirstValue,
1570            exclude_facet_field_filters: self.distinct_filter_fields() > 1,
1571            use_source_realtime: true,
1572            realtime_slack_usec: normalize_journal_vs_realtime_delta_usec(realtime_slack_usec),
1573            stop_when_rows_full: self.data_only && !tail_anchor,
1574            stop_when_rows_full_check_every: DATA_ONLY_CHECK_EVERY_ROWS,
1575            sampling,
1576            debug_collect_column_fields_by_row_traversal: false,
1577        }
1578    }
1579
1580    fn file_query(
1581        &self,
1582        matched_files: usize,
1583        file_header: FileHeader,
1584        order: &JournalFileOrderInfo,
1585    ) -> ExplorerQuery {
1586        let mut query = self.to_explorer_query(
1587            matched_files as u64,
1588            Some(file_header),
1589            order.journal_vs_realtime_delta_usec,
1590        );
1591        if self.data_only && self.delta {
1592            query.stop_when_rows_full = false;
1593        }
1594        query
1595    }
1596
1597    fn unfiltered_vocabulary(&self) -> Self {
1598        let mut request = self.clone();
1599        request.filters.clear();
1600        request.histogram = None;
1601        request.limit = 0;
1602        request.fts_terms.clear();
1603        request.fts_patterns.clear();
1604        request.fts_negative_patterns.clear();
1605        request
1606    }
1607
1608    fn distinct_filter_fields(&self) -> usize {
1609        self.filters
1610            .iter()
1611            .map(|filter| filter.field.as_slice())
1612            .collect::<HashSet<_>>()
1613            .len()
1614    }
1615}
1616
1617#[derive(Debug, Clone)]
1618struct LocatedRow {
1619    file_path: PathBuf,
1620    row: ExplorerRow,
1621}
1622
1623#[derive(Debug)]
1624struct NetdataRealtimeAdjuster {
1625    direction: Direction,
1626    last_realtime_from: u64,
1627    last_realtime_to: u64,
1628}
1629
1630impl NetdataRealtimeAdjuster {
1631    fn new(direction: Direction) -> Self {
1632        Self {
1633            direction,
1634            last_realtime_from: 0,
1635            last_realtime_to: 0,
1636        }
1637    }
1638
1639    fn adjust(&mut self, realtime_usec: u64) -> u64 {
1640        match self.direction {
1641            Direction::Backward => {
1642                if realtime_usec >= self.last_realtime_from
1643                    && realtime_usec <= self.last_realtime_to
1644                {
1645                    self.last_realtime_from = self.last_realtime_from.saturating_sub(1);
1646                    self.last_realtime_from
1647                } else {
1648                    self.last_realtime_from = realtime_usec;
1649                    self.last_realtime_to = realtime_usec;
1650                    realtime_usec
1651                }
1652            }
1653            Direction::Forward => {
1654                if realtime_usec >= self.last_realtime_from
1655                    && realtime_usec <= self.last_realtime_to
1656                {
1657                    self.last_realtime_to = self.last_realtime_to.saturating_add(1);
1658                    self.last_realtime_to
1659                } else {
1660                    self.last_realtime_from = realtime_usec;
1661                    self.last_realtime_to = realtime_usec;
1662                    realtime_usec
1663                }
1664            }
1665        }
1666    }
1667}
1668
1669#[derive(Debug, Default)]
1670struct JournalSourceSummary {
1671    files: u64,
1672    total_size: u64,
1673    first_realtime_usec: Option<u64>,
1674    last_realtime_usec: Option<u64>,
1675}
1676
1677impl JournalSourceSummary {
1678    #[cfg(test)]
1679    fn from_paths(
1680        paths: &[PathBuf],
1681        reader_options: ReaderOptions,
1682        options: &NetdataFunctionRunOptions<'_>,
1683    ) -> Self {
1684        let mut summary = Self::default();
1685        for path in paths {
1686            let metadata = file_metadata(options, path);
1687            summary.add_path(path, reader_options, metadata.as_ref());
1688        }
1689        summary
1690    }
1691
1692    fn add_path(
1693        &mut self,
1694        path: &Path,
1695        reader_options: ReaderOptions,
1696        metadata: Option<&NetdataJournalFileMetadata>,
1697    ) {
1698        if let Ok(metadata) = std::fs::metadata(path) {
1699            self.files = self.files.saturating_add(1);
1700            self.total_size = self.total_size.saturating_add(metadata.len());
1701        }
1702        if let Some(metadata) = metadata {
1703            let metadata_first = metadata.msg_first_realtime_usec;
1704            let metadata_last = metadata.msg_last_realtime_usec;
1705            if let Some(first) = metadata_first {
1706                self.first_realtime_usec = Some(
1707                    self.first_realtime_usec
1708                        .map_or(first, |current| current.min(first)),
1709                );
1710            }
1711            if let Some(last) = metadata_last {
1712                self.last_realtime_usec = Some(
1713                    self.last_realtime_usec
1714                        .map_or(last, |current| current.max(last)),
1715                );
1716            }
1717            if metadata_first.is_some() && metadata_last.is_some() {
1718                return;
1719            }
1720        }
1721        let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
1722            return;
1723        };
1724        let header = reader.header();
1725        if header.head_entry_realtime != 0 {
1726            self.first_realtime_usec = Some(
1727                self.first_realtime_usec
1728                    .map_or(header.head_entry_realtime, |current| {
1729                        current.min(header.head_entry_realtime)
1730                    }),
1731            );
1732        }
1733        if header.tail_entry_realtime != 0 {
1734            self.last_realtime_usec = Some(
1735                self.last_realtime_usec
1736                    .map_or(header.tail_entry_realtime, |current| {
1737                        current.max(header.tail_entry_realtime)
1738                    }),
1739            );
1740        }
1741    }
1742
1743    fn info(&self) -> String {
1744        let coverage = match (self.first_realtime_usec, self.last_realtime_usec) {
1745            (Some(first), Some(last)) if last >= first => {
1746                human_duration_seconds((last - first) / 1_000_000)
1747            }
1748            _ => "0s".to_string(),
1749        };
1750        let last_entry = self
1751            .last_realtime_usec
1752            .and_then(|usec| DateTime::<Utc>::from_timestamp((usec / 1_000_000) as i64, 0))
1753            .map(|datetime| datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1754            .unwrap_or_else(|| "unknown".to_string());
1755        format!(
1756            "{} files, total size {}, covering {}, last entry at {}",
1757            self.files,
1758            human_binary_size(self.total_size),
1759            coverage,
1760            last_entry
1761        )
1762    }
1763}
1764
1765fn push_source_option(target: &mut Vec<Value>, id: &str, summary: &JournalSourceSummary) {
1766    if summary.files == 0 {
1767        return;
1768    }
1769    target.push(json!({
1770        "id": id,
1771        "name": id,
1772        "info": summary.info(),
1773        "pill": human_binary_size(summary.total_size),
1774    }));
1775}
1776
1777fn expand_located_row_payloads(
1778    located: &mut LocatedRow,
1779    reader_options: ReaderOptions,
1780) -> Result<()> {
1781    let mut reader = FileReader::open_with_options(&located.file_path, reader_options)?;
1782    reader.seek_cursor(&located.row.cursor)?;
1783    if !reader.test_cursor(&located.row.cursor)? {
1784        return Err(SdkError::InvalidCursor(format!(
1785            "selected row cursor is no longer available: {}",
1786            located.row.cursor
1787        )));
1788    }
1789    reader.collect_entry_payloads(&mut located.row.payloads)
1790}
1791
1792#[derive(Debug, Default)]
1793struct CombinedResult {
1794    rows: Vec<LocatedRow>,
1795    facets: BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1796    histogram: Option<ExplorerHistogram>,
1797    column_fields: BTreeSet<String>,
1798    stats: ExplorerStats,
1799    page_counters: Option<NetdataPageCounters>,
1800    matched_files: u64,
1801    matched_paths: Vec<PathBuf>,
1802    skipped_files: u64,
1803    file_errors: Vec<String>,
1804    partial: bool,
1805    timed_out: bool,
1806    cancelled: bool,
1807    sampling_enabled: bool,
1808}
1809
1810#[derive(Clone, Copy, Debug, Default)]
1811struct NetdataPageCounters {
1812    matched: u64,
1813    before: u64,
1814    after: u64,
1815}
1816
1817#[derive(Clone, Copy)]
1818struct ProgressContext {
1819    current_file: usize,
1820    total_files: usize,
1821    started: Instant,
1822}
1823
1824#[derive(Debug)]
1825enum NetdataPageHeap {
1826    Backward(BinaryHeap<Reverse<u64>>),
1827    Forward(BinaryHeap<u64>),
1828}
1829
1830#[derive(Debug)]
1831struct NetdataPageWindow {
1832    direction: Direction,
1833    anchor_start_usec: Option<u64>,
1834    anchor_stop_usec: Option<u64>,
1835    limit: usize,
1836    heap: NetdataPageHeap,
1837    oldest_retained_usec: Option<u64>,
1838    newest_retained_usec: Option<u64>,
1839    matched: u64,
1840    skips_before: u64,
1841    skips_after: u64,
1842    shifts: u64,
1843}
1844
1845impl NetdataPageWindow {
1846    fn for_request(request: &NetdataRequest) -> Self {
1847        let (anchor_start_usec, anchor_stop_usec) = match (request.tail, request.anchor) {
1848            (true, ExplorerAnchor::Realtime(anchor)) => (None, Some(anchor)),
1849            (false, ExplorerAnchor::Realtime(anchor)) => (Some(anchor), None),
1850            _ => (None, None),
1851        };
1852        let heap = match request.direction {
1853            Direction::Backward => NetdataPageHeap::Backward(BinaryHeap::new()),
1854            Direction::Forward => NetdataPageHeap::Forward(BinaryHeap::new()),
1855        };
1856        Self {
1857            direction: request.direction,
1858            anchor_start_usec,
1859            anchor_stop_usec,
1860            limit: request.limit,
1861            heap,
1862            oldest_retained_usec: None,
1863            newest_retained_usec: None,
1864            matched: 0,
1865            skips_before: 0,
1866            skips_after: 0,
1867            shifts: 0,
1868        }
1869    }
1870
1871    fn candidate_to_keep(&self, realtime_usec: u64) -> bool {
1872        if self.limit == 0 || !self.entry_within_anchor_readonly(realtime_usec) {
1873            return false;
1874        }
1875        if self.retained_len() < self.limit {
1876            return true;
1877        }
1878        self.oldest_retained_usec
1879            .zip(self.newest_retained_usec)
1880            .is_some_and(|(oldest, newest)| realtime_usec >= oldest && realtime_usec <= newest)
1881    }
1882
1883    fn observe(&mut self, realtime_usec: u64) {
1884        if !self.entry_within_anchor(realtime_usec) || self.limit == 0 {
1885            return;
1886        }
1887        self.matched = self.matched.saturating_add(1);
1888        match (&mut self.heap, self.direction) {
1889            (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1890                if heap.len() < self.limit {
1891                    heap.push(Reverse(realtime_usec));
1892                    self.add_retained_bound(realtime_usec);
1893                    return;
1894                }
1895                let Some(Reverse(oldest)) = heap.peek().copied() else {
1896                    heap.push(Reverse(realtime_usec));
1897                    self.refresh_retained_bounds();
1898                    return;
1899                };
1900                if realtime_usec < oldest {
1901                    self.skips_after = self.skips_after.saturating_add(1);
1902                    return;
1903                }
1904                heap.pop();
1905                heap.push(Reverse(realtime_usec));
1906                self.refresh_retained_bounds();
1907                self.shifts = self.shifts.saturating_add(1);
1908            }
1909            (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1910                if heap.len() < self.limit {
1911                    heap.push(realtime_usec);
1912                    self.add_retained_bound(realtime_usec);
1913                    return;
1914                }
1915                let Some(newest) = heap.peek().copied() else {
1916                    heap.push(realtime_usec);
1917                    self.refresh_retained_bounds();
1918                    return;
1919                };
1920                if realtime_usec > newest {
1921                    self.skips_before = self.skips_before.saturating_add(1);
1922                    return;
1923                }
1924                heap.pop();
1925                heap.push(realtime_usec);
1926                self.refresh_retained_bounds();
1927                self.shifts = self.shifts.saturating_add(1);
1928            }
1929            _ => {}
1930        }
1931    }
1932
1933    fn retained_len(&self) -> usize {
1934        match &self.heap {
1935            NetdataPageHeap::Backward(heap) => heap.len(),
1936            NetdataPageHeap::Forward(heap) => heap.len(),
1937        }
1938    }
1939
1940    fn add_retained_bound(&mut self, realtime_usec: u64) {
1941        self.oldest_retained_usec = Some(
1942            self.oldest_retained_usec
1943                .map_or(realtime_usec, |current| current.min(realtime_usec)),
1944        );
1945        self.newest_retained_usec = Some(
1946            self.newest_retained_usec
1947                .map_or(realtime_usec, |current| current.max(realtime_usec)),
1948        );
1949    }
1950
1951    fn refresh_retained_bounds(&mut self) {
1952        let (oldest, newest) = match &self.heap {
1953            NetdataPageHeap::Backward(heap) => heap
1954                .iter()
1955                .map(|Reverse(usec)| *usec)
1956                .fold((None, None), retained_bounds_fold),
1957            NetdataPageHeap::Forward(heap) => heap
1958                .iter()
1959                .copied()
1960                .fold((None, None), retained_bounds_fold),
1961        };
1962        self.oldest_retained_usec = oldest;
1963        self.newest_retained_usec = newest;
1964    }
1965
1966    fn entry_within_anchor(&mut self, realtime_usec: u64) -> bool {
1967        match self.direction {
1968            Direction::Backward => {
1969                if self
1970                    .anchor_start_usec
1971                    .is_some_and(|anchor| realtime_usec >= anchor)
1972                {
1973                    self.skips_before = self.skips_before.saturating_add(1);
1974                    return false;
1975                }
1976                if self
1977                    .anchor_stop_usec
1978                    .is_some_and(|anchor| realtime_usec <= anchor)
1979                {
1980                    self.skips_after = self.skips_after.saturating_add(1);
1981                    return false;
1982                }
1983            }
1984            Direction::Forward => {
1985                if self
1986                    .anchor_start_usec
1987                    .is_some_and(|anchor| realtime_usec <= anchor)
1988                {
1989                    self.skips_after = self.skips_after.saturating_add(1);
1990                    return false;
1991                }
1992                if self
1993                    .anchor_stop_usec
1994                    .is_some_and(|anchor| realtime_usec >= anchor)
1995                {
1996                    self.skips_before = self.skips_before.saturating_add(1);
1997                    return false;
1998                }
1999            }
2000        }
2001        true
2002    }
2003
2004    fn entry_within_anchor_readonly(&self, realtime_usec: u64) -> bool {
2005        match self.direction {
2006            Direction::Backward => {
2007                if self
2008                    .anchor_start_usec
2009                    .is_some_and(|anchor| realtime_usec >= anchor)
2010                {
2011                    return false;
2012                }
2013                if self
2014                    .anchor_stop_usec
2015                    .is_some_and(|anchor| realtime_usec <= anchor)
2016                {
2017                    return false;
2018                }
2019            }
2020            Direction::Forward => {
2021                if self
2022                    .anchor_start_usec
2023                    .is_some_and(|anchor| realtime_usec <= anchor)
2024                {
2025                    return false;
2026                }
2027                if self
2028                    .anchor_stop_usec
2029                    .is_some_and(|anchor| realtime_usec >= anchor)
2030                {
2031                    return false;
2032                }
2033            }
2034        }
2035        true
2036    }
2037
2038    fn counters(&self) -> NetdataPageCounters {
2039        NetdataPageCounters {
2040            matched: self.matched,
2041            before: self.skips_before,
2042            after: self.skips_after.saturating_add(self.shifts),
2043        }
2044    }
2045
2046    fn can_stop_delta_file(&self, realtime_usec: u64, slack_usec: u64) -> bool {
2047        if self.limit == 0 {
2048            return false;
2049        }
2050        match (&self.heap, self.direction) {
2051            (NetdataPageHeap::Backward(heap), Direction::Backward) => {
2052                if heap.len() < self.limit {
2053                    return false;
2054                }
2055                heap.peek().is_some_and(|Reverse(oldest)| {
2056                    realtime_usec < oldest.saturating_sub(slack_usec)
2057                })
2058            }
2059            (NetdataPageHeap::Forward(heap), Direction::Forward) => {
2060                if heap.len() < self.limit {
2061                    return false;
2062                }
2063                heap.peek()
2064                    .is_some_and(|newest| realtime_usec > newest.saturating_add(slack_usec))
2065            }
2066            _ => false,
2067        }
2068    }
2069}
2070
2071fn retained_bounds_fold(
2072    (oldest, newest): (Option<u64>, Option<u64>),
2073    realtime_usec: u64,
2074) -> (Option<u64>, Option<u64>) {
2075    (
2076        Some(oldest.map_or(realtime_usec, |current| current.min(realtime_usec))),
2077        Some(newest.map_or(realtime_usec, |current| current.max(realtime_usec))),
2078    )
2079}
2080
2081impl CombinedResult {
2082    fn merge(
2083        &mut self,
2084        path: &Path,
2085        result: ExplorerResult,
2086        direction: Direction,
2087        limit: usize,
2088    ) -> Result<()> {
2089        let ExplorerResult {
2090            rows,
2091            facets,
2092            histogram,
2093            stats,
2094            column_fields,
2095            ..
2096        } = result;
2097
2098        if let Some(histogram) = histogram {
2099            merge_histogram(&mut self.histogram, histogram)?;
2100        }
2101
2102        self.merge_stats(stats);
2103        for row in rows {
2104            self.rows.push(LocatedRow {
2105                file_path: path.to_path_buf(),
2106                row,
2107            });
2108        }
2109        for field in column_fields {
2110            if let Ok(field) = String::from_utf8(field) {
2111                self.column_fields.insert(field);
2112            }
2113        }
2114        for (field, values) in facets {
2115            let target = self.facets.entry(field).or_default();
2116            for (value, count) in values {
2117                add_netdata_facet_count(target, &value, count);
2118            }
2119        }
2120        self.sort_and_limit(direction, limit);
2121        Ok(())
2122    }
2123
2124    fn add_column_fields<I>(&mut self, fields: I)
2125    where
2126        I: IntoIterator<Item = String>,
2127    {
2128        self.column_fields.extend(fields);
2129    }
2130
2131    fn sort_and_limit(&mut self, direction: Direction, limit: usize) {
2132        match direction {
2133            Direction::Forward => self.rows.sort_by_key(|row| row.row.realtime_usec),
2134            Direction::Backward => self
2135                .rows
2136                .sort_by(|left, right| right.row.realtime_usec.cmp(&left.row.realtime_usec)),
2137        }
2138        make_row_timestamps_unique(&mut self.rows, direction);
2139        if self.rows.len() > limit {
2140            self.rows.truncate(limit);
2141        }
2142        self.stats.rows_returned = self.rows.len() as u64;
2143    }
2144
2145    fn expand_row_payloads(&mut self, reader_options: ReaderOptions) {
2146        if self.rows.is_empty() {
2147            self.stats.rows_returned = 0;
2148            return;
2149        }
2150
2151        let mut rows = Vec::with_capacity(self.rows.len());
2152        for mut located in self.rows.drain(..) {
2153            if !located.row.payloads.is_empty() {
2154                rows.push(located);
2155                continue;
2156            }
2157            match expand_located_row_payloads(&mut located, reader_options) {
2158                Ok(()) => {
2159                    self.stats.returned_row_expansions =
2160                        self.stats.returned_row_expansions.saturating_add(1);
2161                    rows.push(located);
2162                }
2163                Err(err) => {
2164                    self.partial = true;
2165                    self.file_errors.push(format!(
2166                        "{} cursor {}: {err}",
2167                        located.file_path.display(),
2168                        located.row.cursor
2169                    ));
2170                }
2171            }
2172        }
2173        self.rows = rows;
2174        self.stats.rows_returned = self.rows.len() as u64;
2175    }
2176
2177    fn merge_stats(&mut self, stats: ExplorerStats) {
2178        self.stats.rows_examined = self.stats.rows_examined.saturating_add(stats.rows_examined);
2179        self.stats.rows_matched = self.stats.rows_matched.saturating_add(stats.rows_matched);
2180        self.stats.facet_rows_matched = self
2181            .stats
2182            .facet_rows_matched
2183            .saturating_add(stats.facet_rows_matched);
2184        self.stats.rows_returned = self.stats.rows_returned.saturating_add(stats.rows_returned);
2185        self.stats.rows_unsampled = self
2186            .stats
2187            .rows_unsampled
2188            .saturating_add(stats.rows_unsampled);
2189        self.stats.rows_estimated = self
2190            .stats
2191            .rows_estimated
2192            .saturating_add(stats.rows_estimated);
2193        self.stats.sampling_sampled = self
2194            .stats
2195            .sampling_sampled
2196            .saturating_add(stats.sampling_sampled);
2197        self.stats.sampling_unsampled = self
2198            .stats
2199            .sampling_unsampled
2200            .saturating_add(stats.sampling_unsampled);
2201        self.stats.sampling_estimated = self
2202            .stats
2203            .sampling_estimated
2204            .saturating_add(stats.sampling_estimated);
2205        if stats.last_realtime_usec > self.stats.last_realtime_usec {
2206            self.stats.last_realtime_usec = stats.last_realtime_usec;
2207        }
2208        if stats.max_source_realtime_delta_usec > self.stats.max_source_realtime_delta_usec {
2209            self.stats.max_source_realtime_delta_usec = stats.max_source_realtime_delta_usec;
2210        }
2211        self.stats.data_refs_seen = self
2212            .stats
2213            .data_refs_seen
2214            .saturating_add(stats.data_refs_seen);
2215        self.stats.data_refs_skipped = self
2216            .stats
2217            .data_refs_skipped
2218            .saturating_add(stats.data_refs_skipped);
2219        self.stats.data_payloads_loaded = self
2220            .stats
2221            .data_payloads_loaded
2222            .saturating_add(stats.data_payloads_loaded);
2223        self.stats.data_objects_classified = self
2224            .stats
2225            .data_objects_classified
2226            .saturating_add(stats.data_objects_classified);
2227        self.stats.data_cache_hits = self
2228            .stats
2229            .data_cache_hits
2230            .saturating_add(stats.data_cache_hits);
2231        self.stats.data_cache_misses = self
2232            .stats
2233            .data_cache_misses
2234            .saturating_add(stats.data_cache_misses);
2235        self.stats.payloads_decompressed = self
2236            .stats
2237            .payloads_decompressed
2238            .saturating_add(stats.payloads_decompressed);
2239        self.stats.fts_scans = self.stats.fts_scans.saturating_add(stats.fts_scans);
2240        self.stats.facet_updates = self.stats.facet_updates.saturating_add(stats.facet_updates);
2241        self.stats.histogram_updates = self
2242            .stats
2243            .histogram_updates
2244            .saturating_add(stats.histogram_updates);
2245        self.stats.returned_row_expansions = self
2246            .stats
2247            .returned_row_expansions
2248            .saturating_add(stats.returned_row_expansions);
2249        self.stats.early_stop_opportunities = self
2250            .stats
2251            .early_stop_opportunities
2252            .saturating_add(stats.early_stop_opportunities);
2253        self.stats.early_stops = self.stats.early_stops.saturating_add(stats.early_stops);
2254    }
2255
2256    fn add_zero_count_facet_values(
2257        &mut self,
2258        vocabulary: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
2259    ) {
2260        for (field, values) in vocabulary {
2261            let target = self.facets.entry(field.clone()).or_default();
2262            for value in values.keys() {
2263                add_netdata_facet_count(target, value, 0);
2264            }
2265        }
2266    }
2267
2268    fn add_zero_count_facet_values_from_files(
2269        &mut self,
2270        fields: &[Vec<u8>],
2271        reader_options: ReaderOptions,
2272    ) {
2273        for path in &self.matched_paths {
2274            let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2275                continue;
2276            };
2277            for field in fields {
2278                let Ok(field_name) = std::str::from_utf8(field) else {
2279                    continue;
2280                };
2281                let Ok(values) = reader.query_unique(field_name) else {
2282                    continue;
2283                };
2284                if values.is_empty() {
2285                    continue;
2286                }
2287                let target = self.facets.entry(field.clone()).or_default();
2288                for value in values {
2289                    add_netdata_facet_count(target, &value, 0);
2290                }
2291            }
2292        }
2293    }
2294
2295    fn add_zero_count_selected_filter_values(&mut self, request: &NetdataRequest) {
2296        let mut report_fields: HashSet<Vec<u8>> = request.facets.iter().cloned().collect();
2297        if let Some(histogram) = &request.histogram {
2298            report_fields.insert(histogram.as_bytes().to_vec());
2299        }
2300        for filter in &request.filters {
2301            if !report_fields.contains(&filter.field) {
2302                continue;
2303            }
2304            let target = self.facets.entry(filter.field.clone()).or_default();
2305            for value in &filter.values {
2306                add_netdata_facet_count(target, value, 0);
2307            }
2308        }
2309    }
2310
2311    fn reportable_facet_fields(&self, requested: &[Vec<u8>]) -> Vec<String> {
2312        string_fields(&self.reportable_facet_fields_bytes(requested))
2313    }
2314
2315    fn reportable_facet_fields_bytes(&self, requested: &[Vec<u8>]) -> Vec<Vec<u8>> {
2316        requested
2317            .iter()
2318            .filter(|field| {
2319                self.facets
2320                    .get(*field)
2321                    .is_some_and(facet_group_is_reportable)
2322            })
2323            .cloned()
2324            .collect()
2325    }
2326}
2327
2328fn netdata_function_error(status: u64, message: &str) -> Value {
2329    json!({
2330        "status": status,
2331        "errorMessage": message,
2332    })
2333}
2334
2335fn query_message(timed_out: bool, stats: &ExplorerStats) -> Value {
2336    if !timed_out && stats.rows_unsampled == 0 && stats.rows_estimated == 0 {
2337        return Value::String("OK".to_string());
2338    }
2339
2340    let total = stats
2341        .rows_examined
2342        .saturating_add(stats.rows_unsampled)
2343        .saturating_add(stats.rows_estimated)
2344        .max(1);
2345    let real_percent = stats.rows_examined as f64 * 100.0 / total as f64;
2346    let unsampled_percent = stats.rows_unsampled as f64 * 100.0 / total as f64;
2347    let estimated_percent = stats.rows_estimated as f64 * 100.0 / total as f64;
2348
2349    let mut title = String::new();
2350    let mut description = String::new();
2351    let mut status = "notice";
2352    if timed_out {
2353        title.push_str("Query timed-out, incomplete data. ");
2354        description.push_str(
2355            "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ",
2356        );
2357        status = "warning";
2358    }
2359    if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2360        title.push_str(&format!("{real_percent:.2}% real data"));
2361        description.push_str(&format!(
2362            "ACTUAL DATA: The filters counters reflect {real_percent:.2}% of the data. "
2363        ));
2364    }
2365    if stats.rows_unsampled != 0 {
2366        title.push_str(&format!(", {unsampled_percent:.2}% unsampled"));
2367        description.push_str(&format!(
2368            "UNSAMPLED DATA: {unsampled_percent:.2}% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
2369        ));
2370    }
2371    if stats.rows_estimated != 0 {
2372        title.push_str(&format!(", {estimated_percent:.2}% estimated"));
2373        description.push_str(&format!(
2374            "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by {estimated_percent:.2}%. "
2375        ));
2376    }
2377
2378    json!({
2379        "title": title,
2380        "status": status,
2381        "description": description,
2382    })
2383}
2384
2385fn merged_progress_stats(completed: &ExplorerStats, current: &ExplorerStats) -> ExplorerStats {
2386    let mut stats = completed.clone();
2387    stats.rows_examined = stats.rows_examined.saturating_add(current.rows_examined);
2388    stats.rows_matched = stats.rows_matched.saturating_add(current.rows_matched);
2389    stats.facet_rows_matched = stats
2390        .facet_rows_matched
2391        .saturating_add(current.facet_rows_matched);
2392    stats.rows_returned = stats.rows_returned.saturating_add(current.rows_returned);
2393    stats.rows_unsampled = stats.rows_unsampled.saturating_add(current.rows_unsampled);
2394    stats.rows_estimated = stats.rows_estimated.saturating_add(current.rows_estimated);
2395    stats.sampling_sampled = stats
2396        .sampling_sampled
2397        .saturating_add(current.sampling_sampled);
2398    stats.sampling_unsampled = stats
2399        .sampling_unsampled
2400        .saturating_add(current.sampling_unsampled);
2401    stats.sampling_estimated = stats
2402        .sampling_estimated
2403        .saturating_add(current.sampling_estimated);
2404    if current.last_realtime_usec > stats.last_realtime_usec {
2405        stats.last_realtime_usec = current.last_realtime_usec;
2406    }
2407    if current.max_source_realtime_delta_usec > stats.max_source_realtime_delta_usec {
2408        stats.max_source_realtime_delta_usec = current.max_source_realtime_delta_usec;
2409    }
2410    stats.data_refs_seen = stats.data_refs_seen.saturating_add(current.data_refs_seen);
2411    stats.data_refs_skipped = stats
2412        .data_refs_skipped
2413        .saturating_add(current.data_refs_skipped);
2414    stats.data_payloads_loaded = stats
2415        .data_payloads_loaded
2416        .saturating_add(current.data_payloads_loaded);
2417    stats.data_objects_classified = stats
2418        .data_objects_classified
2419        .saturating_add(current.data_objects_classified);
2420    stats.data_cache_hits = stats
2421        .data_cache_hits
2422        .saturating_add(current.data_cache_hits);
2423    stats.data_cache_misses = stats
2424        .data_cache_misses
2425        .saturating_add(current.data_cache_misses);
2426    stats.payloads_decompressed = stats
2427        .payloads_decompressed
2428        .saturating_add(current.payloads_decompressed);
2429    stats.fts_scans = stats.fts_scans.saturating_add(current.fts_scans);
2430    stats.facet_updates = stats.facet_updates.saturating_add(current.facet_updates);
2431    stats.histogram_updates = stats
2432        .histogram_updates
2433        .saturating_add(current.histogram_updates);
2434    stats.returned_row_expansions = stats
2435        .returned_row_expansions
2436        .saturating_add(current.returned_row_expansions);
2437    stats.early_stop_opportunities = stats
2438        .early_stop_opportunities
2439        .saturating_add(current.early_stop_opportunities);
2440    stats.early_stops = stats.early_stops.saturating_add(current.early_stops);
2441    stats
2442}
2443
2444struct Columns {
2445    order: Vec<String>,
2446    map: Map<String, Value>,
2447}
2448
2449struct QueryResponseArtifacts {
2450    reportable_facet_field_names: Vec<String>,
2451    columns: Columns,
2452    data: Vec<Value>,
2453    facets: Value,
2454    histogram: Option<Value>,
2455    message: Value,
2456    items: Value,
2457}
2458
2459fn base_query_response(
2460    request: &NetdataRequest,
2461    combined: &CombinedResult,
2462    artifacts: &QueryResponseArtifacts,
2463) -> Value {
2464    json!({
2465        "_request": &request.echo,
2466        "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
2467        "_journal_files": {
2468            "matched": combined.matched_files,
2469            "skipped": combined.skipped_files,
2470            "errors": &combined.file_errors,
2471        },
2472        "status": 200,
2473        "partial": combined.partial,
2474        "type": "table",
2475        "show_ids": true,
2476        "has_history": true,
2477        "pagination": {
2478            "enabled": true,
2479            "key": "anchor",
2480            "column": "timestamp",
2481            "units": "timestamp_usec",
2482        },
2483        "columns": &artifacts.columns.map,
2484        "data": &artifacts.data,
2485        "_stats": {
2486            "sdk_explorer": &combined.stats,
2487        },
2488        "expires": if request.data_only {
2489            unix_now_seconds().saturating_add(3600)
2490        } else {
2491            0
2492        }
2493    })
2494}
2495
2496fn response_items(request: &NetdataRequest, combined: &CombinedResult, returned: u64) -> Value {
2497    let unsampled = combined.stats.rows_unsampled;
2498    let estimated = combined.stats.rows_estimated;
2499    let fallback_rows_after_returned =
2500        response_fallback_rows_after_returned(&combined.stats, returned);
2501    let page_counters = combined
2502        .page_counters
2503        .unwrap_or_else(|| NetdataPageCounters {
2504            matched: combined.stats.rows_matched,
2505            before: 0,
2506            after: fallback_rows_after_returned,
2507        });
2508    json!({
2509        "evaluated": combined.stats.rows_examined.saturating_add(unsampled).saturating_add(estimated),
2510        "matched": page_counters.matched.saturating_add(unsampled).saturating_add(estimated),
2511        "unsampled": unsampled,
2512        "estimated": estimated,
2513        "returned": returned,
2514        "max_to_return": request.limit as u64,
2515        "before": page_counters.before,
2516        "after": page_counters.after,
2517    })
2518}
2519
2520fn response_fallback_rows_after_returned(stats: &ExplorerStats, returned: u64) -> u64 {
2521    let source_rows = if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2522        stats.rows_examined
2523    } else {
2524        stats.rows_matched
2525    };
2526    source_rows.saturating_sub(returned)
2527}
2528
2529fn add_last_modified_if_needed(
2530    object: &mut Map<String, Value>,
2531    request: &NetdataRequest,
2532    combined: &CombinedResult,
2533) {
2534    if !request.data_only || request.tail {
2535        object.insert(
2536            "last_modified".to_string(),
2537            Value::from(combined.stats.last_realtime_usec),
2538        );
2539    }
2540}
2541
2542fn add_sampling_if_needed(object: &mut Map<String, Value>, combined: &CombinedResult) {
2543    if combined.sampling_enabled {
2544        object.insert(
2545            "_sampling".to_string(),
2546            json!({
2547                "enabled": true,
2548                "sampled": combined.stats.sampling_sampled,
2549                "unsampled": combined.stats.sampling_unsampled,
2550                "estimated": combined.stats.sampling_estimated,
2551            }),
2552        );
2553    }
2554}
2555
2556fn add_analysis_outputs_if_needed(
2557    object: &mut Map<String, Value>,
2558    request: &NetdataRequest,
2559    artifacts: QueryResponseArtifacts,
2560) {
2561    if !request.data_only || request.delta {
2562        let (facets_key, histogram_key, items_key) = response_analysis_keys(request.data_only);
2563        object.insert(facets_key.to_string(), artifacts.facets);
2564        object.insert(
2565            histogram_key.to_string(),
2566            artifacts.histogram.unwrap_or(Value::Null),
2567        );
2568        object.insert(items_key.to_string(), artifacts.items);
2569    }
2570}
2571
2572fn response_analysis_keys(data_only: bool) -> (&'static str, &'static str, &'static str) {
2573    if data_only {
2574        ("facets_delta", "histogram_delta", "items_delta")
2575    } else {
2576        ("facets", "histogram", "items")
2577    }
2578}
2579
2580fn merge_histogram(
2581    target: &mut Option<ExplorerHistogram>,
2582    source: ExplorerHistogram,
2583) -> Result<()> {
2584    let Some(target) = target else {
2585        *target = Some(source);
2586        return Ok(());
2587    };
2588    if target.field != source.field
2589        || target.buckets.len() != source.buckets.len()
2590        || target
2591            .buckets
2592            .iter()
2593            .zip(source.buckets.iter())
2594            .any(|(target_bucket, source_bucket)| {
2595                target_bucket.start_realtime_usec != source_bucket.start_realtime_usec
2596                    || target_bucket.end_realtime_usec != source_bucket.end_realtime_usec
2597            })
2598    {
2599        return Err(SdkError::Unsupported(
2600            "inconsistent Netdata histogram bucket shape",
2601        ));
2602    }
2603    for (index, source_bucket) in source.buckets.into_iter().enumerate() {
2604        let Some(target_bucket) = target.buckets.get_mut(index) else {
2605            return Err(SdkError::Unsupported(
2606                "inconsistent Netdata histogram bucket shape",
2607            ));
2608        };
2609        for (value, count) in source_bucket.values {
2610            *target_bucket.values.entry(value).or_default() += count;
2611        }
2612    }
2613    Ok(())
2614}
2615
2616fn facet_group_is_reportable(values: &BTreeMap<Vec<u8>, u64>) -> bool {
2617    values
2618        .iter()
2619        .any(|(value, _count)| !value.is_empty() && value.as_slice() != b"-")
2620}
2621
2622fn netdata_facet_value(value: &[u8]) -> &[u8] {
2623    if value.len() > NETDATA_FACET_MAX_VALUE_LENGTH {
2624        &value[..NETDATA_FACET_MAX_VALUE_LENGTH]
2625    } else {
2626        value
2627    }
2628}
2629
2630fn add_netdata_facet_count(target: &mut BTreeMap<Vec<u8>, u64>, value: &[u8], count: u64) {
2631    *target
2632        .entry(netdata_facet_value(value).to_vec())
2633        .or_default() += count;
2634}
2635
2636fn not_modified_before_scan_response(
2637    request: &NetdataRequest,
2638    selected: &SelectedJournalFiles,
2639) -> Option<Value> {
2640    if request.if_modified_since_usec != 0 && !selected.files_are_newer {
2641        Some(netdata_function_error(
2642            304,
2643            "No new data since the previous call.",
2644        ))
2645    } else {
2646        None
2647    }
2648}
2649
2650fn should_collect_unfiltered_facet_vocabulary(
2651    request: &NetdataRequest,
2652    combined: &CombinedResult,
2653) -> bool {
2654    !request.data_only && !combined.partial && !request.filters.is_empty()
2655}
2656
2657fn progress_context(file_index: usize, total_files: usize, started: Instant) -> ProgressContext {
2658    ProgressContext {
2659        current_file: file_index + 1,
2660        total_files,
2661        started,
2662    }
2663}
2664
2665fn emit_netdata_progress(
2666    options: &mut NetdataFunctionRunOptions<'_>,
2667    progress: NetdataFunctionProgress,
2668) {
2669    if let Some(callback) = options.progress_callback.as_deref_mut() {
2670        callback(progress);
2671    }
2672}
2673
2674fn emit_progress_for_combined(
2675    options: &mut NetdataFunctionRunOptions<'_>,
2676    combined: &CombinedResult,
2677    context: ProgressContext,
2678) {
2679    emit_netdata_progress(
2680        options,
2681        NetdataFunctionProgress {
2682            current_file: context.current_file,
2683            total_files: context.total_files,
2684            matched_files: combined.matched_files,
2685            skipped_files: combined.skipped_files,
2686            stats: combined.stats.clone(),
2687            elapsed: context.started.elapsed(),
2688        },
2689    );
2690}
2691
2692fn emit_explorer_progress(
2693    options: &mut NetdataFunctionRunOptions<'_>,
2694    combined: &CombinedResult,
2695    progress: ExplorerProgress,
2696    context: ProgressContext,
2697) {
2698    let stats = merged_progress_stats(&combined.stats, &progress.stats);
2699    if let Some(callback) = options.progress_callback.as_deref_mut() {
2700        callback(NetdataFunctionProgress {
2701            current_file: context.current_file,
2702            total_files: context.total_files,
2703            matched_files: combined.matched_files,
2704            skipped_files: combined.skipped_files,
2705            stats,
2706            elapsed: context.started.elapsed(),
2707        });
2708    }
2709}
2710
2711fn request_cancelled(options: &NetdataFunctionRunOptions<'_>) -> bool {
2712    options
2713        .cancellation_callback
2714        .is_some_and(|is_cancelled| is_cancelled())
2715}
2716
2717fn should_stop_before_file(
2718    combined: &mut CombinedResult,
2719    deadline: Option<Instant>,
2720    options: &NetdataFunctionRunOptions<'_>,
2721) -> bool {
2722    if request_cancelled(options) {
2723        combined.partial = true;
2724        combined.cancelled = true;
2725        return true;
2726    }
2727    if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
2728        combined.partial = true;
2729        combined.timed_out = true;
2730        return true;
2731    }
2732    false
2733}
2734
2735fn collect_column_fields_for_file(
2736    reader: &mut FileReader,
2737    request: &NetdataRequest,
2738    path: &Path,
2739    combined: &mut CombinedResult,
2740) {
2741    if request.data_only {
2742        return;
2743    }
2744    match reader.enumerate_fields_indexed() {
2745        Ok(fields) => combined.add_column_fields(fields),
2746        Err(err) => combined.file_errors.push(format!(
2747            "{}: FIELD index enumeration failed: {err}",
2748            path.display()
2749        )),
2750    }
2751}
2752
2753fn record_explore_result(
2754    result: Result<(ExplorerResult, Option<ExplorerStopReason>)>,
2755    path: &Path,
2756    combined: &mut CombinedResult,
2757) -> Option<(ExplorerResult, Option<ExplorerStopReason>)> {
2758    match result {
2759        Ok(result) => Some(result),
2760        Err(err) => {
2761            combined.skipped_files = combined.skipped_files.saturating_add(1);
2762            combined
2763                .file_errors
2764                .push(format!("{}: {err}", path.display()));
2765            None
2766        }
2767    }
2768}
2769
2770fn delta_scan_can_stop(
2771    request: &NetdataRequest,
2772    page_window: &RefCell<NetdataPageWindow>,
2773    realtime_usec: u64,
2774    rows_matched: u64,
2775    realtime_delta_usec: u64,
2776) -> bool {
2777    let mut page_window = page_window.borrow_mut();
2778    page_window.observe(realtime_usec);
2779    request.data_only
2780        && request.delta
2781        && rows_matched % DATA_ONLY_CHECK_EVERY_ROWS == 0
2782        && page_window.can_stop_delta_file(realtime_usec, realtime_delta_usec)
2783}
2784
2785#[allow(clippy::too_many_arguments)]
2786fn finish_explored_file(
2787    options: &mut NetdataFunctionRunOptions<'_>,
2788    request: &NetdataRequest,
2789    file: &SelectedJournalFile,
2790    query: &ExplorerQuery,
2791    result: ExplorerResult,
2792    stop_reason: Option<ExplorerStopReason>,
2793    combined: &mut CombinedResult,
2794    files: &[SelectedJournalFile],
2795    file_index: usize,
2796    progress: ProgressContext,
2797) -> Result<bool> {
2798    update_learned_realtime_delta(options, &file.path, &file.order, &result.stats);
2799    combined.merge(&file.path, result, query.direction, request.limit)?;
2800    emit_progress_for_combined(options, combined, progress);
2801    if request_cancelled(options) {
2802        combined.partial = true;
2803        combined.cancelled = true;
2804        return Ok(true);
2805    }
2806    if let Some(reason) = stop_reason {
2807        combined.partial = true;
2808        match reason {
2809            ExplorerStopReason::TimedOut => combined.timed_out = true,
2810            ExplorerStopReason::Cancelled => combined.cancelled = true,
2811        }
2812        return Ok(true);
2813    }
2814    Ok(request.data_only
2815        && !request.delta
2816        && !request.tail
2817        && combined.rows.len() >= request.limit
2818        && remaining_files_cannot_affect_data_page(combined, request, files, file_index + 1))
2819}
2820
2821fn file_metadata(
2822    options: &NetdataFunctionRunOptions<'_>,
2823    path: &Path,
2824) -> Option<NetdataJournalFileMetadata> {
2825    options
2826        .state
2827        .as_deref()
2828        .and_then(|state| state.file_metadata(path))
2829}
2830
2831fn update_learned_realtime_delta(
2832    options: &mut NetdataFunctionRunOptions<'_>,
2833    path: &Path,
2834    order: &JournalFileOrderInfo,
2835    stats: &ExplorerStats,
2836) {
2837    let learned_delta = stats.max_source_realtime_delta_usec;
2838    if learned_delta == 0 || learned_delta <= order.journal_vs_realtime_delta_usec {
2839        return;
2840    }
2841    let learned_delta = normalize_journal_vs_realtime_delta_usec(learned_delta);
2842    if learned_delta <= order.journal_vs_realtime_delta_usec {
2843        return;
2844    }
2845    if let Some(state) = options.state.as_deref_mut() {
2846        state.update_file_journal_vs_realtime_delta_usec(path, learned_delta);
2847    }
2848}
2849
2850fn normalize_journal_vs_realtime_delta_usec(delta_usec: u64) -> u64 {
2851    delta_usec
2852        .max(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC)
2853        .min(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC)
2854}
2855
2856#[cfg(test)]
2857fn file_may_overlap_request(header: crate::FileHeader, request: &NetdataRequest) -> bool {
2858    if header.tail_entry_realtime == 0 {
2859        return true;
2860    }
2861
2862    let first = header
2863        .head_entry_realtime
2864        .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2865    let last = header
2866        .tail_entry_realtime
2867        .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2868
2869    if request
2870        .after_realtime_usec
2871        .is_some_and(|after| last < after)
2872    {
2873        return false;
2874    }
2875    if request
2876        .before_realtime_usec
2877        .is_some_and(|before| first > before)
2878    {
2879        return false;
2880    }
2881
2882    true
2883}
2884
2885#[derive(Debug)]
2886struct SelectedJournalFile {
2887    path: PathBuf,
2888    order: JournalFileOrderInfo,
2889}
2890
2891#[derive(Debug, Default)]
2892struct SelectedJournalFiles {
2893    files: Vec<SelectedJournalFile>,
2894    files_are_newer: bool,
2895}
2896
2897fn select_journal_files_for_request(
2898    paths: Vec<PathBuf>,
2899    request: &NetdataRequest,
2900    reader_options: ReaderOptions,
2901    options: &NetdataFunctionRunOptions<'_>,
2902) -> SelectedJournalFiles {
2903    let mut selected = Vec::new();
2904    for path in paths {
2905        let metadata = file_metadata(options, &path);
2906        if !request.matches_source(&path, metadata.as_ref()) {
2907            continue;
2908        }
2909        let order = journal_file_order_info(&path, reader_options, metadata.as_ref());
2910        if !journal_file_order_may_overlap_request(&order, request) {
2911            continue;
2912        }
2913        selected.push(SelectedJournalFile { path, order });
2914    }
2915    selected.sort_by(|left, right| {
2916        compare_journal_file_order(&left.order, &right.order, request.direction)
2917            .then_with(|| left.path.cmp(&right.path))
2918    });
2919    let files_are_newer = selected
2920        .iter()
2921        .any(|file| file.order.msg_last_realtime_usec > request.if_modified_since_usec);
2922    SelectedJournalFiles {
2923        files: selected,
2924        files_are_newer,
2925    }
2926}
2927
2928fn remaining_files_cannot_affect_data_page(
2929    combined: &CombinedResult,
2930    request: &NetdataRequest,
2931    files: &[SelectedJournalFile],
2932    next_file_index: usize,
2933) -> bool {
2934    let Some(next) = files.get(next_file_index) else {
2935        return true;
2936    };
2937    let slack = next.order.journal_vs_realtime_delta_usec;
2938    match request.direction {
2939        Direction::Backward => {
2940            let Some(oldest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).min()
2941            else {
2942                return false;
2943            };
2944            next.order.msg_last_realtime_usec < oldest_retained.saturating_sub(slack)
2945        }
2946        Direction::Forward => {
2947            let Some(newest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).max()
2948            else {
2949                return false;
2950            };
2951            next.order.msg_first_realtime_usec > newest_retained.saturating_add(slack)
2952        }
2953    }
2954}
2955
2956fn journal_file_order_may_overlap_request(
2957    info: &JournalFileOrderInfo,
2958    request: &NetdataRequest,
2959) -> bool {
2960    if info.msg_last_realtime_usec == 0 {
2961        return true;
2962    }
2963
2964    let first = info
2965        .msg_first_realtime_usec
2966        .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2967    let last = info
2968        .msg_last_realtime_usec
2969        .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2970
2971    if request
2972        .after_realtime_usec
2973        .is_some_and(|after| last < after)
2974    {
2975        return false;
2976    }
2977    if request
2978        .before_realtime_usec
2979        .is_some_and(|before| first > before)
2980    {
2981        return false;
2982    }
2983
2984    true
2985}
2986
2987fn collect_boot_first_realtime(
2988    paths: &[PathBuf],
2989    reader_options: ReaderOptions,
2990    needed_boot_ids: &BTreeSet<Vec<u8>>,
2991) -> BTreeMap<Vec<u8>, u64> {
2992    let mut out = BTreeMap::new();
2993    if needed_boot_ids.is_empty() {
2994        return out;
2995    }
2996    for path in paths {
2997        let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2998            continue;
2999        };
3000        let Ok(boot_ids) = reader.query_unique("_BOOT_ID") else {
3001            continue;
3002        };
3003        for boot_id in boot_ids {
3004            if !needed_boot_ids.contains(&boot_id) {
3005                continue;
3006            }
3007            let mut match_payload = b"_BOOT_ID=".to_vec();
3008            match_payload.extend_from_slice(&boot_id);
3009            reader.flush_matches();
3010            reader.add_match(&match_payload);
3011            reader.seek_head();
3012            if !reader.next().unwrap_or(false) {
3013                continue;
3014            }
3015            if let Ok(realtime) = reader.get_realtime_usec() {
3016                record_boot_first_realtime(&mut out, boot_id, realtime);
3017            }
3018        }
3019        reader.flush_matches();
3020    }
3021    out
3022}
3023
3024fn response_boot_ids(
3025    column_order: &[String],
3026    rows: &[LocatedRow],
3027    facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
3028    histogram: Option<&ExplorerHistogram>,
3029) -> BTreeSet<Vec<u8>> {
3030    let mut boot_ids = BTreeSet::new();
3031    let row_needs_boot_id = column_order.iter().any(|field| field == "_BOOT_ID");
3032    if row_needs_boot_id {
3033        for row in rows {
3034            if let Some(values) = row_fields(row).get("_BOOT_ID") {
3035                boot_ids.extend(values.iter().cloned());
3036            }
3037        }
3038    }
3039    if let Some(values) = facets.get(b"_BOOT_ID".as_slice()) {
3040        boot_ids.extend(
3041            values
3042                .keys()
3043                .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3044                .cloned(),
3045        );
3046    }
3047    if let Some(histogram) = histogram.filter(|histogram| histogram.field == b"_BOOT_ID") {
3048        for bucket in &histogram.buckets {
3049            boot_ids.extend(
3050                bucket
3051                    .values
3052                    .keys()
3053                    .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3054                    .cloned(),
3055            );
3056        }
3057    }
3058    boot_ids
3059}
3060
3061fn record_boot_first_realtime(
3062    target: &mut BTreeMap<Vec<u8>, u64>,
3063    boot_id: Vec<u8>,
3064    realtime_usec: u64,
3065) {
3066    let existing = target.entry(boot_id).or_insert(realtime_usec);
3067    if realtime_usec < *existing {
3068        *existing = realtime_usec;
3069    }
3070}
3071
3072fn row_fields(row: &LocatedRow) -> BTreeMap<String, Vec<Vec<u8>>> {
3073    let mut fields = BTreeMap::new();
3074    for payload in &row.row.payloads {
3075        let Some((field, value)) = split_payload(payload) else {
3076            continue;
3077        };
3078        fields
3079            .entry(String::from_utf8_lossy(field).into_owned())
3080            .or_insert_with(Vec::new)
3081            .push(value.to_vec());
3082    }
3083    fields.insert(
3084        "ND_JOURNAL_FILE".to_string(),
3085        vec![row.file_path.display().to_string().into_bytes()],
3086    );
3087    if !fields.contains_key("ND_JOURNAL_PROCESS") {
3088        let process = dynamic_process_name(&fields);
3089        if !process.is_empty() {
3090            fields.insert("ND_JOURNAL_PROCESS".to_string(), vec![process.into_bytes()]);
3091        }
3092    }
3093    fields
3094}
3095
3096fn dynamic_process_name(fields: &BTreeMap<String, Vec<Vec<u8>>>) -> String {
3097    let base = first_value(fields, "CONTAINER_NAME")
3098        .or_else(|| first_value(fields, "SYSLOG_IDENTIFIER"))
3099        .or_else(|| first_value(fields, "_COMM"))
3100        .map(|value| String::from_utf8_lossy(value).into_owned())
3101        .unwrap_or_default();
3102    if base.is_empty() {
3103        return "-".to_string();
3104    }
3105    let pid = first_value(fields, "_PID").map(|value| String::from_utf8_lossy(value).into_owned());
3106    match pid {
3107        Some(pid) if !pid.is_empty() => format!("{base}[{pid}]"),
3108        _ => base,
3109    }
3110}
3111
3112fn make_row_timestamps_unique(rows: &mut [LocatedRow], direction: Direction) {
3113    let mut last_from = 0u64;
3114    let mut last_to = 0u64;
3115    let mut initialized = false;
3116    for row in rows {
3117        let timestamp = row.row.realtime_usec;
3118        if initialized && timestamp >= last_from && timestamp <= last_to {
3119            match direction {
3120                Direction::Backward => {
3121                    last_from = last_from.saturating_sub(1);
3122                    row.row.realtime_usec = last_from;
3123                }
3124                Direction::Forward => {
3125                    last_to = last_to.saturating_add(1);
3126                    row.row.realtime_usec = last_to;
3127                }
3128            }
3129        } else {
3130            last_from = timestamp;
3131            last_to = timestamp;
3132            initialized = true;
3133        }
3134    }
3135}
3136
3137fn first_value<'a>(fields: &'a BTreeMap<String, Vec<Vec<u8>>>, field: &str) -> Option<&'a [u8]> {
3138    fields
3139        .get(field)
3140        .and_then(|values| values.first())
3141        .map(Vec::as_slice)
3142}
3143
3144fn split_payload(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3145    let split = payload.iter().position(|byte| *byte == b'=')?;
3146    Some((&payload[..split], &payload[split + 1..]))
3147}
3148
3149fn column_metadata(key: &str, index: usize) -> Value {
3150    let (visible, filter, full_width) = match key {
3151        "timestamp" => (true, "range", false),
3152        "rowOptions" => (false, "none", false),
3153        "_HOSTNAME" => (true, "facet", false),
3154        "ND_JOURNAL_PROCESS" | "MESSAGE" => (true, "none", key == "MESSAGE"),
3155        "ND_JOURNAL_FILE" | "_SOURCE_REALTIME_TIMESTAMP" => (false, "none", false),
3156        _ if systemd_column_is_facet(key) => (false, "facet", false),
3157        _ => (false, "none", false),
3158    };
3159    let column_type = if key == "timestamp" {
3160        "timestamp"
3161    } else if key == "rowOptions" {
3162        "none"
3163    } else {
3164        "string"
3165    };
3166    let visualization = if key == "rowOptions" {
3167        "rowOptions"
3168    } else {
3169        "value"
3170    };
3171    let mut metadata = json!({
3172        "index": index,
3173        "unique_key": key == "timestamp",
3174        "name": if key == "timestamp" { "Timestamp" } else { key },
3175        "visible": visible,
3176        "type": column_type,
3177        "visualization": visualization,
3178        "value_options": {
3179            "transform": if key == "timestamp" { "datetime_usec" } else { "none" },
3180            "decimal_points": 0,
3181            "default_value": if key == "timestamp" || key == "rowOptions" {
3182                Value::Null
3183            } else {
3184                Value::String("-".to_string())
3185            },
3186        },
3187        "sort": "ascending",
3188        "sortable": false,
3189        "sticky": false,
3190        "summary": "count",
3191        "filter": filter,
3192        "full_width": full_width,
3193        "wrap": key != "rowOptions",
3194        "default_expanded_filter": matches!(key, "PRIORITY" | "SYSLOG_FACILITY" | "MESSAGE_ID"),
3195    });
3196    if key == "rowOptions" {
3197        if let Some(object) = metadata.as_object_mut() {
3198            object.insert("dummy".to_string(), Value::Bool(true));
3199        }
3200    }
3201    metadata
3202}
3203
3204fn systemd_column_is_facet(key: &str) -> bool {
3205    if key == "MESSAGE_ID" {
3206        return true;
3207    }
3208    if key.contains("MESSAGE") || key.contains("TIMESTAMP") || key.starts_with("__") {
3209        return false;
3210    }
3211    true
3212}
3213
3214fn sort_facet_options(field: &str, options: &mut [Value]) {
3215    options.sort_by(|left, right| {
3216        let left_id = left.get("id").and_then(Value::as_str).unwrap_or_default();
3217        let right_id = right.get("id").and_then(Value::as_str).unwrap_or_default();
3218        if field == "PRIORITY" {
3219            return parse_priority(left_id).cmp(&parse_priority(right_id));
3220        }
3221        let left_count = left
3222            .get("count")
3223            .and_then(Value::as_u64)
3224            .unwrap_or_default();
3225        let right_count = right
3226            .get("count")
3227            .and_then(Value::as_u64)
3228            .unwrap_or_default();
3229        right_count
3230            .cmp(&left_count)
3231            .then_with(|| left_id.cmp(right_id))
3232    });
3233}
3234
3235fn parse_fts_query_patterns(query: &str) -> (Vec<ExplorerFtsPattern>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
3236    let bytes = query.as_bytes();
3237    let mut index = 0usize;
3238    let mut terms = Vec::new();
3239    let mut positives = Vec::new();
3240    let mut negatives = Vec::new();
3241
3242    while let Some((pattern, negative)) = next_fts_pattern(bytes, &mut index) {
3243        push_fts_pattern(
3244            pattern,
3245            negative,
3246            &mut terms,
3247            &mut positives,
3248            &mut negatives,
3249        );
3250    }
3251
3252    (terms, positives, negatives)
3253}
3254
3255fn next_fts_pattern(bytes: &[u8], index: &mut usize) -> Option<(Vec<u8>, bool)> {
3256    while *index < bytes.len() {
3257        skip_fts_separators(bytes, index);
3258        let negative = consume_fts_negative_marker(bytes, index);
3259        let pattern = read_fts_pattern(bytes, index);
3260        if !pattern.is_empty() {
3261            return Some((pattern, negative));
3262        }
3263    }
3264    None
3265}
3266
3267fn skip_fts_separators(bytes: &[u8], index: &mut usize) {
3268    while *index < bytes.len() && bytes[*index] == b'|' {
3269        *index += 1;
3270    }
3271}
3272
3273fn consume_fts_negative_marker(bytes: &[u8], index: &mut usize) -> bool {
3274    if bytes.get(*index) == Some(&b'!') {
3275        *index += 1;
3276        true
3277    } else {
3278        false
3279    }
3280}
3281
3282fn read_fts_pattern(bytes: &[u8], index: &mut usize) -> Vec<u8> {
3283    let mut pattern = Vec::new();
3284    let mut escaped = false;
3285    while *index < bytes.len() {
3286        let byte = bytes[*index];
3287        *index += 1;
3288        if byte == b'\\' && !escaped {
3289            escaped = true;
3290            continue;
3291        }
3292        if byte == b'|' && !escaped {
3293            break;
3294        }
3295        pattern.push(byte);
3296        escaped = false;
3297    }
3298    pattern
3299}
3300
3301fn push_fts_pattern(
3302    pattern: Vec<u8>,
3303    negative: bool,
3304    terms: &mut Vec<ExplorerFtsPattern>,
3305    positives: &mut Vec<Vec<u8>>,
3306    negatives: &mut Vec<Vec<u8>>,
3307) {
3308    terms.push(ExplorerFtsPattern::substring(pattern.clone(), negative));
3309    if negative {
3310        negatives.push(pattern);
3311    } else {
3312        positives.push(pattern);
3313    }
3314}
3315
3316fn parse_filters(value: Option<&Value>) -> Vec<ExplorerFilter> {
3317    let Some(Value::Object(selections)) = value else {
3318        return Vec::new();
3319    };
3320    let mut filters = Vec::new();
3321    for (field, values) in selections {
3322        if matches!(field.as_str(), "query" | "source" | "__logs_sources") {
3323            continue;
3324        }
3325        let Some(values) = parse_string_array(Some(values)) else {
3326            continue;
3327        };
3328        filters.push(ExplorerFilter::new(
3329            field.as_bytes().to_vec(),
3330            values
3331                .into_iter()
3332                .map(|value| normalize_filter_value(field, &value)),
3333        ));
3334    }
3335    filters
3336}
3337
3338#[derive(Debug, Clone)]
3339struct SourceSelection {
3340    source_type: u64,
3341    exact_sources: Vec<String>,
3342}
3343
3344fn parse_source_selection(value: Option<&Value>) -> SourceSelection {
3345    let mut selection = SourceSelection {
3346        source_type: SOURCE_TYPE_ALL,
3347        exact_sources: Vec::new(),
3348    };
3349    let Some(Value::Object(selections)) = value else {
3350        return selection;
3351    };
3352    let Some(values) = parse_string_array(selections.get("__logs_sources")) else {
3353        return selection;
3354    };
3355    selection.source_type = 0;
3356    for value in values {
3357        match source_type_for_name(&value) {
3358            Some(source_type) => selection.source_type |= source_type,
3359            None => selection.exact_sources.push(value),
3360        }
3361    }
3362    selection
3363}
3364
3365fn source_type_for_name(value: &str) -> Option<u64> {
3366    match value {
3367        "all" => Some(SOURCE_TYPE_ALL),
3368        "all-local-logs" => Some(SOURCE_TYPE_LOCAL_ALL),
3369        "all-remote-systems" => Some(SOURCE_TYPE_REMOTE_ALL),
3370        "all-local-system-logs" => Some(SOURCE_TYPE_LOCAL_SYSTEM),
3371        "all-local-user-logs" => Some(SOURCE_TYPE_LOCAL_USER),
3372        "all-local-namespaces" => Some(SOURCE_TYPE_LOCAL_NAMESPACE),
3373        "all-uncategorized" => Some(SOURCE_TYPE_LOCAL_OTHER),
3374        _ => None,
3375    }
3376}
3377
3378fn journal_file_source_type(path: &Path) -> u64 {
3379    let text = path.to_string_lossy();
3380    let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
3381        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER;
3382    };
3383    if text.contains("/remote/") {
3384        return SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL;
3385    }
3386    if local_namespace_source_name(path).is_some() {
3387        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE;
3388    }
3389    if name.starts_with("system") {
3390        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM;
3391    }
3392    if name.starts_with("user") {
3393        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER;
3394    }
3395    SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
3396}
3397
3398fn local_namespace_source_name(path: &Path) -> Option<String> {
3399    let parent = path.parent()?.file_name()?.to_str()?;
3400    let (_, namespace) = parent.rsplit_once('.')?;
3401    (!namespace.is_empty()).then(|| format!("namespace-{namespace}"))
3402}
3403
3404fn journal_file_exact_source_name(path: &Path) -> Option<String> {
3405    let text = path.to_string_lossy();
3406    if text.contains("/remote/") {
3407        let name = path.file_name()?.to_str()?;
3408        let source = name
3409            .split_once('@')
3410            .map(|(prefix, _)| prefix)
3411            .unwrap_or_else(|| {
3412                name.strip_suffix(".journal~.zst")
3413                    .or_else(|| name.strip_suffix(".journal.zst"))
3414                    .or_else(|| name.strip_suffix(".journal~"))
3415                    .or_else(|| name.strip_suffix(".journal"))
3416                    .unwrap_or(name)
3417            });
3418        return source.starts_with("remote-").then(|| source.to_string());
3419    }
3420    local_namespace_source_name(path)
3421}
3422
3423fn normalize_filter_value(field: &str, value: &str) -> Vec<u8> {
3424    if field == "PRIORITY" {
3425        if let Some(priority) = priority_name_to_number(value) {
3426            return priority.as_bytes().to_vec();
3427        }
3428    }
3429    value.as_bytes().to_vec()
3430}
3431
3432fn parse_string_array(value: Option<&Value>) -> Option<Vec<String>> {
3433    let Value::Array(items) = value? else {
3434        return None;
3435    };
3436    Some(
3437        items
3438            .iter()
3439            .filter_map(Value::as_str)
3440            .map(ToOwned::to_owned)
3441            .collect(),
3442    )
3443}
3444
3445fn request_direction(object: &Map<String, Value>) -> Direction {
3446    match get_str(object, "direction").unwrap_or("backward") {
3447        "forward" | "forwards" | "next" => Direction::Forward,
3448        _ => Direction::Backward,
3449    }
3450}
3451
3452fn request_delta(data_only: bool, object: &Map<String, Value>) -> bool {
3453    data_only && get_bool(object, "delta").unwrap_or(false)
3454}
3455
3456fn request_tail(data_only: bool, if_modified_since_usec: u64, object: &Map<String, Value>) -> bool {
3457    data_only && if_modified_since_usec != 0 && get_bool(object, "tail").unwrap_or(false)
3458}
3459
3460fn tail_after_realtime_bound(
3461    after_realtime_usec: Option<u64>,
3462    anchor: ExplorerAnchor,
3463) -> Option<u64> {
3464    let ExplorerAnchor::Realtime(anchor) = anchor else {
3465        return after_realtime_usec;
3466    };
3467    let tail_after = anchor.saturating_add(1);
3468    Some(
3469        after_realtime_usec
3470            .map(|after| after.max(tail_after))
3471            .unwrap_or(tail_after),
3472    )
3473}
3474
3475fn before_realtime_bound_excluding_anchor(
3476    before_realtime_usec: Option<u64>,
3477    anchor: ExplorerAnchor,
3478) -> Option<u64> {
3479    let ExplorerAnchor::Realtime(anchor) = anchor else {
3480        return before_realtime_usec;
3481    };
3482    let before_anchor = anchor.saturating_sub(1);
3483    Some(
3484        before_realtime_usec
3485            .map(|before| before.min(before_anchor))
3486            .unwrap_or(before_anchor),
3487    )
3488}
3489
3490fn request_anchor_and_direction(
3491    object: &Map<String, Value>,
3492    tail: bool,
3493    direction: Direction,
3494    after_realtime_usec: Option<u64>,
3495    before_realtime_usec: Option<u64>,
3496) -> (ExplorerAnchor, Direction) {
3497    let anchor = get_u64(object, "anchor")
3498        .filter(|value| *value != 0)
3499        .map(normalize_timestamp_to_usec)
3500        .map(ExplorerAnchor::Realtime)
3501        .unwrap_or(ExplorerAnchor::Auto);
3502    if tail && matches!(anchor, ExplorerAnchor::Realtime(_)) {
3503        return (anchor, Direction::Backward);
3504    }
3505    if anchor_outside_window(anchor, after_realtime_usec, before_realtime_usec) {
3506        (ExplorerAnchor::Auto, Direction::Backward)
3507    } else {
3508        (anchor, direction)
3509    }
3510}
3511
3512fn anchor_outside_window(
3513    anchor: ExplorerAnchor,
3514    after_realtime_usec: Option<u64>,
3515    before_realtime_usec: Option<u64>,
3516) -> bool {
3517    let ExplorerAnchor::Realtime(anchor_usec) = anchor else {
3518        return false;
3519    };
3520    after_realtime_usec.is_some_and(|after| anchor_usec < after)
3521        || before_realtime_usec.is_some_and(|before| anchor_usec > before)
3522}
3523
3524fn request_limit(object: &Map<String, Value>) -> usize {
3525    get_u64(object, "last")
3526        .filter(|value| *value != 0)
3527        .map(|value| value as usize)
3528        .unwrap_or(DEFAULT_ITEMS_TO_RETURN)
3529}
3530
3531fn request_facets(
3532    requested_facets: &Option<Vec<String>>,
3533    config: &NetdataFunctionConfig,
3534) -> Vec<Vec<u8>> {
3535    requested_facets
3536        .clone()
3537        .unwrap_or_else(|| config.default_facets.clone())
3538        .into_iter()
3539        .map(Vec::from)
3540        .collect()
3541}
3542
3543fn request_histogram(object: &Map<String, Value>) -> Option<String> {
3544    get_str(object, "histogram")
3545        .filter(|histogram| !histogram.is_empty())
3546        .map(ToOwned::to_owned)
3547}
3548
3549fn request_histogram_or_default(
3550    requested_histogram: &Option<String>,
3551    config: &NetdataFunctionConfig,
3552) -> Option<String> {
3553    requested_histogram
3554        .clone()
3555        .or_else(|| config.default_histogram.clone())
3556}
3557
3558fn request_query(object: &Map<String, Value>) -> Option<String> {
3559    get_str(object, "query")
3560        .filter(|query| !query.is_empty())
3561        .map(ToOwned::to_owned)
3562}
3563
3564fn get_bool(object: &Map<String, Value>, key: &str) -> Option<bool> {
3565    object.get(key).and_then(Value::as_bool)
3566}
3567
3568fn get_i64(object: &Map<String, Value>, key: &str) -> Option<i64> {
3569    object.get(key).and_then(Value::as_i64)
3570}
3571
3572fn get_u64(object: &Map<String, Value>, key: &str) -> Option<u64> {
3573    object.get(key).and_then(Value::as_u64)
3574}
3575
3576fn get_str<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
3577    object.get(key).and_then(Value::as_str)
3578}
3579
3580fn normalize_time_window(
3581    now_seconds: i64,
3582    after: Option<i64>,
3583    before: Option<i64>,
3584) -> (Option<u64>, Option<u64>) {
3585    let mut after = after.unwrap_or(0);
3586    let mut before = before.unwrap_or(0);
3587
3588    if after == 0 && before == 0 {
3589        before = now_seconds;
3590        after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3591    } else {
3592        (after, before) = relative_window_to_absolute(now_seconds, after, before);
3593    }
3594
3595    if after > before {
3596        std::mem::swap(&mut after, &mut before);
3597    }
3598    if after == before {
3599        after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3600    }
3601
3602    (
3603        Some(normalize_timestamp_to_usec_with_rounding(
3604            after.max(0) as u64,
3605            false,
3606        )),
3607        Some(normalize_timestamp_to_usec_with_rounding(
3608            before.max(0) as u64,
3609            true,
3610        )),
3611    )
3612}
3613
3614fn relative_window_to_absolute(now_seconds: i64, after: i64, before: i64) -> (i64, i64) {
3615    let mut after = after;
3616    let mut before = before;
3617
3618    if before.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3619        if before > 0 {
3620            before = -before;
3621        }
3622        before = now_seconds.saturating_add(before);
3623    }
3624
3625    if after.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3626        if after > 0 {
3627            after = -after;
3628        }
3629        if after == 0 {
3630            after = -NETDATA_MISSING_AFTER_RELATIVE_SECONDS;
3631        }
3632        after = before.saturating_add(after).saturating_add(1);
3633    }
3634
3635    if after > before {
3636        std::mem::swap(&mut after, &mut before);
3637    }
3638
3639    if before > now_seconds {
3640        let delta = before.saturating_sub(now_seconds);
3641        before = before.saturating_sub(delta);
3642        after = after.saturating_sub(delta);
3643    }
3644
3645    (after, before)
3646}
3647
3648struct RequestEchoInput<'a> {
3649    info: bool,
3650    after_realtime_usec: Option<u64>,
3651    before_realtime_usec: Option<u64>,
3652    if_modified_since_usec: u64,
3653    anchor: ExplorerAnchor,
3654    direction: Direction,
3655    limit: usize,
3656    data_only: bool,
3657    delta: bool,
3658    tail: bool,
3659    sampling: u64,
3660    source_type: u64,
3661    requested_facets: Option<&'a [String]>,
3662    selections: Option<&'a Value>,
3663    histogram: Option<&'a str>,
3664    query: Option<&'a str>,
3665}
3666
3667fn normalized_request_echo(input: &RequestEchoInput<'_>) -> Value {
3668    let anchor_usec = match input.anchor {
3669        ExplorerAnchor::Realtime(usec) => usec,
3670        ExplorerAnchor::Auto | ExplorerAnchor::Head | ExplorerAnchor::Tail => 0,
3671    };
3672    let mut out = json!({
3673        "info": input.info,
3674        // The SDK Netdata boundary always uses indexed slice semantics. The
3675        // field remains in the echo because it is part of the plugin request
3676        // shape and downstream fixtures compare normalized requests.
3677        "slice": true,
3678        "data_only": input.data_only,
3679        "delta": input.delta,
3680        "tail": input.tail,
3681        "sampling": input.sampling,
3682        "source_type": input.source_type,
3683        "after": input.after_realtime_usec.unwrap_or(0) / 1_000_000,
3684        "before": input.before_realtime_usec.unwrap_or(0) / 1_000_000,
3685        "if_modified_since": input.if_modified_since_usec,
3686        "anchor": anchor_usec,
3687        "direction": match input.direction {
3688            Direction::Forward => "forward",
3689            Direction::Backward => "backward",
3690        },
3691        "last": input.limit,
3692        "query": input.query,
3693        "histogram": input.histogram,
3694    });
3695    if let Some(facets) = input.requested_facets {
3696        if let Some(object) = out.as_object_mut() {
3697            object.insert(
3698                "facets".to_string(),
3699                facets
3700                    .iter()
3701                    .map(|field| Value::String(field.clone()))
3702                    .collect(),
3703            );
3704        }
3705    }
3706    if let Some(Value::Object(selections)) = input.selections {
3707        let mut selections = selections.clone();
3708        if let Some(Value::Array(sources)) = selections.get_mut("__logs_sources") {
3709            for source in sources {
3710                *source = Value::Null;
3711            }
3712        }
3713        if let Some(object) = out.as_object_mut() {
3714            object.insert("selections".to_string(), Value::Object(selections));
3715        }
3716    }
3717    out
3718}
3719
3720fn normalize_timestamp_to_usec(value: u64) -> u64 {
3721    normalize_timestamp_to_usec_with_rounding(value, false)
3722}
3723
3724fn normalize_timestamp_to_usec_with_rounding(value: u64, end_of_second: bool) -> u64 {
3725    if value >= 1_000_000_000_000 {
3726        value
3727    } else if end_of_second {
3728        value.saturating_mul(1_000_000).saturating_add(999_999)
3729    } else {
3730        value.saturating_mul(1_000_000)
3731    }
3732}
3733
3734fn unix_now_seconds() -> i64 {
3735    SystemTime::now()
3736        .duration_since(UNIX_EPOCH)
3737        .map(|duration| duration.as_secs() as i64)
3738        .unwrap_or_default()
3739}
3740
3741fn human_binary_size(bytes: u64) -> String {
3742    const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
3743    let mut value = bytes as f64;
3744    let mut unit = 0usize;
3745    while value >= 1024.0 && unit + 1 < UNITS.len() {
3746        value /= 1024.0;
3747        unit += 1;
3748    }
3749    if unit == 0 {
3750        format!("{}{}", bytes, UNITS[unit])
3751    } else if value.fract() == 0.0 {
3752        format!("{value:.0}{}", UNITS[unit])
3753    } else {
3754        let mut formatted = format!("{value:.2}");
3755        while formatted.contains('.') && formatted.ends_with('0') {
3756            formatted.pop();
3757        }
3758        if formatted.ends_with('.') {
3759            formatted.pop();
3760        }
3761        format!("{formatted}{}", UNITS[unit])
3762    }
3763}
3764
3765fn human_duration_seconds(seconds: u64) -> String {
3766    let years = seconds / (365 * 86_400);
3767    let seconds = seconds % (365 * 86_400);
3768    let months = seconds / (30 * 86_400);
3769    let seconds = seconds % (30 * 86_400);
3770    let days = seconds / 86_400;
3771    let seconds = seconds % 86_400;
3772    let hours = seconds / 3600;
3773    let minutes = (seconds % 3600) / 60;
3774    let seconds = seconds % 60;
3775    let mut parts = Vec::new();
3776    if years != 0 {
3777        parts.push(format!("{years}y"));
3778    }
3779    if months != 0 {
3780        parts.push(format!("{months}mo"));
3781    }
3782    if days != 0 {
3783        parts.push(format!("{days}d"));
3784    }
3785    if hours != 0 {
3786        parts.push(format!("{hours}h"));
3787    }
3788    if minutes != 0 {
3789        parts.push(format!("{minutes}m"));
3790    }
3791    if seconds != 0 || parts.is_empty() {
3792        parts.push(format!("{seconds}s"));
3793    }
3794    parts.join(" ")
3795}
3796
3797#[derive(Debug, Default)]
3798struct JournalFileCollection {
3799    files: Vec<PathBuf>,
3800    skipped: u64,
3801    errors: Vec<String>,
3802}
3803
3804fn collect_journal_files(path: &Path) -> Result<JournalFileCollection> {
3805    if !path.is_dir() {
3806        return Err(SdkError::InvalidPath(format!(
3807            "not a directory: {}",
3808            path.display()
3809        )));
3810    }
3811    let mut collection = JournalFileCollection::default();
3812    let mut pending = VecDeque::from([(path.to_path_buf(), 0usize)]);
3813    let mut visited = HashSet::new();
3814    while let Some((directory, depth)) = pending.pop_front() {
3815        let visited_key = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
3816        if visited.contains(&visited_key) {
3817            continue;
3818        }
3819        if visited.len() >= NETDATA_MAX_DIRECTORY_SCAN_COUNT {
3820            collection.skipped = collection.skipped.saturating_add(1);
3821            collection.errors.push(format!(
3822                "{}: directory scan limit reached",
3823                directory.display()
3824            ));
3825            continue;
3826        }
3827        visited.insert(visited_key);
3828        let entries = match std::fs::read_dir(&directory) {
3829            Ok(entries) => entries,
3830            Err(err) if directory == path => return Err(err.into()),
3831            Err(err) => {
3832                collection.skipped = collection.skipped.saturating_add(1);
3833                collection
3834                    .errors
3835                    .push(format!("{}: {err}", directory.display()));
3836                continue;
3837            }
3838        };
3839        for entry in entries.flatten() {
3840            let entry_path = entry.path();
3841            if entry_path.is_file() && is_journal_file_name(&entry_path) {
3842                collection.files.push(entry_path);
3843            } else if depth < NETDATA_MAX_DIRECTORY_SCAN_DEPTH && entry_path.is_dir() {
3844                pending.push_back((entry_path, depth + 1));
3845            }
3846        }
3847    }
3848    collection.files.sort();
3849    dedup_journal_files_by_canonical_path(&mut collection.files);
3850    Ok(collection)
3851}
3852
3853fn dedup_journal_files_by_canonical_path(files: &mut Vec<PathBuf>) {
3854    let mut seen = HashSet::new();
3855    files.retain(|path| {
3856        let key = std::fs::canonicalize(path).unwrap_or_else(|_| path.clone());
3857        seen.insert(key)
3858    });
3859}
3860
3861#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3862struct JournalFileOrderInfo {
3863    msg_first_realtime_usec: u64,
3864    msg_last_realtime_usec: u64,
3865    file_last_modified_usec: u64,
3866    journal_vs_realtime_delta_usec: u64,
3867}
3868
3869fn journal_file_order_info(
3870    path: &Path,
3871    reader_options: ReaderOptions,
3872    metadata: Option<&NetdataJournalFileMetadata>,
3873) -> JournalFileOrderInfo {
3874    let file_last_modified_usec = std::fs::metadata(path)
3875        .ok()
3876        .and_then(|metadata| metadata.modified().ok())
3877        .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
3878        .map(|duration| duration.as_micros().min(u128::from(u64::MAX)) as u64)
3879        .unwrap_or_default();
3880    let file_last_modified_usec = metadata
3881        .and_then(|metadata| metadata.file_last_modified_usec)
3882        .unwrap_or(file_last_modified_usec);
3883    let journal_vs_realtime_delta_usec = metadata
3884        .and_then(|metadata| metadata.journal_vs_realtime_delta_usec)
3885        .map(normalize_journal_vs_realtime_delta_usec)
3886        .unwrap_or(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
3887
3888    let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
3889        return JournalFileOrderInfo {
3890            msg_first_realtime_usec: 0,
3891            msg_last_realtime_usec: file_last_modified_usec,
3892            file_last_modified_usec,
3893            journal_vs_realtime_delta_usec,
3894        };
3895    };
3896    let header = reader.header();
3897    let msg_first_realtime_usec = metadata
3898        .and_then(|metadata| metadata.msg_first_realtime_usec)
3899        .unwrap_or(header.head_entry_realtime);
3900    let msg_last_realtime_usec = metadata
3901        .and_then(|metadata| metadata.msg_last_realtime_usec)
3902        .unwrap_or_else(|| {
3903            if header.tail_entry_realtime == 0 {
3904                file_last_modified_usec
3905            } else {
3906                header.tail_entry_realtime
3907            }
3908        });
3909    JournalFileOrderInfo {
3910        msg_first_realtime_usec,
3911        msg_last_realtime_usec,
3912        file_last_modified_usec,
3913        journal_vs_realtime_delta_usec,
3914    }
3915}
3916
3917fn compare_journal_file_order(
3918    left: &JournalFileOrderInfo,
3919    right: &JournalFileOrderInfo,
3920    direction: Direction,
3921) -> Ordering {
3922    let backward = right
3923        .msg_last_realtime_usec
3924        .cmp(&left.msg_last_realtime_usec)
3925        .then_with(|| {
3926            right
3927                .file_last_modified_usec
3928                .cmp(&left.file_last_modified_usec)
3929        })
3930        .then_with(|| {
3931            right
3932                .msg_first_realtime_usec
3933                .cmp(&left.msg_first_realtime_usec)
3934        });
3935    match direction {
3936        Direction::Backward => backward,
3937        Direction::Forward => backward.reverse(),
3938    }
3939}
3940
3941fn is_journal_file_name(path: &Path) -> bool {
3942    path.file_name()
3943        .and_then(|name| name.to_str())
3944        .is_some_and(|name| {
3945            name.ends_with(".journal")
3946                || name.ends_with(".journal~")
3947                || name.ends_with(".journal.zst")
3948                || name.ends_with(".journal~.zst")
3949        })
3950}
3951
3952fn push_unique_many(target: &mut Vec<String>, values: &[String]) {
3953    for value in values {
3954        push_unique(target, value);
3955    }
3956}
3957
3958fn string_fields(fields: &[Vec<u8>]) -> Vec<String> {
3959    fields
3960        .iter()
3961        .filter_map(|field| String::from_utf8(field.clone()).ok())
3962        .collect()
3963}
3964
3965fn push_unique(target: &mut Vec<String>, value: impl AsRef<str>) {
3966    let value = value.as_ref();
3967    if !target.iter().any(|existing| existing == value) {
3968        target.push(value.to_string());
3969    }
3970}
3971
3972fn netdata_reorder_key(value: &str) -> String {
3973    value
3974        .trim_start_matches(|character: char| character.is_ascii_punctuation())
3975        .to_ascii_lowercase()
3976}
3977
3978fn histogram_update_every_seconds(histogram: &ExplorerHistogram) -> u64 {
3979    histogram
3980        .buckets
3981        .first()
3982        .map(|bucket| {
3983            bucket
3984                .end_realtime_usec
3985                .saturating_sub(bucket.start_realtime_usec)
3986                .checked_div(1_000_000)
3987                .unwrap_or(1)
3988                .max(1)
3989        })
3990        .unwrap_or(1)
3991}
3992
3993enum TimestampPrecision {
3994    Seconds,
3995    Micros,
3996}
3997
3998fn format_realtime_usec(timestamp: u64, precision: TimestampPrecision) -> String {
3999    let seconds = (timestamp / 1_000_000) as i64;
4000    let micros = (timestamp % 1_000_000) as u32;
4001    DateTime::<Utc>::from_timestamp(seconds, micros.saturating_mul(1000))
4002        .map(|datetime| match precision {
4003            TimestampPrecision::Seconds => datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
4004            TimestampPrecision::Micros => datetime.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string(),
4005        })
4006        .unwrap_or_else(|| timestamp.to_string())
4007}
4008
4009fn priority_name(raw: &str) -> Option<&'static str> {
4010    match parse_priority(raw)? {
4011        0 => Some("panic"),
4012        1 => Some("alert"),
4013        2 => Some("critical"),
4014        3 => Some("error"),
4015        4 => Some("warning"),
4016        5 => Some("notice"),
4017        6 => Some("info"),
4018        7 => Some("debug"),
4019        _ => None,
4020    }
4021}
4022
4023fn priority_name_to_number(value: &str) -> Option<&'static str> {
4024    match value {
4025        "panic" | "emergency" | "emerg" => Some("0"),
4026        "alert" => Some("1"),
4027        "critical" | "crit" => Some("2"),
4028        "error" | "err" => Some("3"),
4029        "warning" | "warn" => Some("4"),
4030        "notice" => Some("5"),
4031        "info" => Some("6"),
4032        "debug" => Some("7"),
4033        _ => None,
4034    }
4035}
4036
4037fn parse_priority(raw: &str) -> Option<u8> {
4038    raw.parse::<u8>().ok()
4039}
4040
4041fn priority_to_row_severity(raw: &[u8]) -> &'static str {
4042    let raw = String::from_utf8_lossy(raw);
4043    match parse_priority(&raw) {
4044        Some(priority) if priority <= 3 => "critical",
4045        Some(4) => "warning",
4046        Some(5) => "notice",
4047        Some(priority) if priority >= 7 => "debug",
4048        _ => "normal",
4049    }
4050}
4051
4052fn syslog_facility_name(raw: &str) -> Option<&'static str> {
4053    match raw.parse::<u8>().ok()? {
4054        0 => Some("kern"),
4055        1 => Some("user"),
4056        2 => Some("mail"),
4057        3 => Some("daemon"),
4058        4 => Some("auth"),
4059        5 => Some("syslog"),
4060        6 => Some("lpr"),
4061        7 => Some("news"),
4062        8 => Some("uucp"),
4063        9 => Some("cron"),
4064        10 => Some("authpriv"),
4065        11 => Some("ftp"),
4066        16 => Some("local0"),
4067        17 => Some("local1"),
4068        18 => Some("local2"),
4069        19 => Some("local3"),
4070        20 => Some("local4"),
4071        21 => Some("local5"),
4072        22 => Some("local6"),
4073        23 => Some("local7"),
4074        _ => None,
4075    }
4076}
4077
4078const ERRNO_NAMES: &[(u32, &str)] = &[
4079    (1, "EPERM"),
4080    (2, "ENOENT"),
4081    (3, "ESRCH"),
4082    (4, "EINTR"),
4083    (5, "EIO"),
4084    (6, "ENXIO"),
4085    (7, "E2BIG"),
4086    (8, "ENOEXEC"),
4087    (9, "EBADF"),
4088    (10, "ECHILD"),
4089    (11, "EAGAIN"),
4090    (12, "ENOMEM"),
4091    (13, "EACCES"),
4092    (14, "EFAULT"),
4093    (15, "ENOTBLK"),
4094    (16, "EBUSY"),
4095    (17, "EEXIST"),
4096    (18, "EXDEV"),
4097    (19, "ENODEV"),
4098    (20, "ENOTDIR"),
4099    (21, "EISDIR"),
4100    (22, "EINVAL"),
4101    (23, "ENFILE"),
4102    (24, "EMFILE"),
4103    (25, "ENOTTY"),
4104    (26, "ETXTBSY"),
4105    (27, "EFBIG"),
4106    (28, "ENOSPC"),
4107    (29, "ESPIPE"),
4108    (30, "EROFS"),
4109    (31, "EMLINK"),
4110    (32, "EPIPE"),
4111    (33, "EDOM"),
4112    (34, "ERANGE"),
4113    (35, "EDEADLK"),
4114    (36, "ENAMETOOLONG"),
4115    (37, "ENOLCK"),
4116    (38, "ENOSYS"),
4117    (39, "ENOTEMPTY"),
4118    (40, "ELOOP"),
4119    (42, "ENOMSG"),
4120    (43, "EIDRM"),
4121    (44, "ECHRNG"),
4122    (45, "EL2NSYNC"),
4123    (46, "EL3HLT"),
4124    (47, "EL3RST"),
4125    (48, "ELNRNG"),
4126    (49, "EUNATCH"),
4127    (50, "ENOCSI"),
4128    (51, "EL2HLT"),
4129    (52, "EBADE"),
4130    (53, "EBADR"),
4131    (54, "EXFULL"),
4132    (55, "ENOANO"),
4133    (56, "EBADRQC"),
4134    (57, "EBADSLT"),
4135    (59, "EBFONT"),
4136    (60, "ENOSTR"),
4137    (61, "ENODATA"),
4138    (62, "ETIME"),
4139    (63, "ENOSR"),
4140    (64, "ENONET"),
4141    (65, "ENOPKG"),
4142    (66, "EREMOTE"),
4143    (67, "ENOLINK"),
4144    (68, "EADV"),
4145    (69, "ESRMNT"),
4146    (70, "ECOMM"),
4147    (71, "EPROTO"),
4148    (72, "EMULTIHOP"),
4149    (73, "EDOTDOT"),
4150    (74, "EBADMSG"),
4151    (75, "EOVERFLOW"),
4152    (76, "ENOTUNIQ"),
4153    (77, "EBADFD"),
4154    (78, "EREMCHG"),
4155    (79, "ELIBACC"),
4156    (80, "ELIBBAD"),
4157    (81, "ELIBSCN"),
4158    (82, "ELIBMAX"),
4159    (83, "ELIBEXEC"),
4160    (84, "EILSEQ"),
4161    (85, "ERESTART"),
4162    (86, "ESTRPIPE"),
4163    (87, "EUSERS"),
4164    (88, "ENOTSOCK"),
4165    (89, "EDESTADDRREQ"),
4166    (90, "EMSGSIZE"),
4167    (91, "EPROTOTYPE"),
4168    (92, "ENOPROTOOPT"),
4169    (93, "EPROTONOSUPPORT"),
4170    (94, "ESOCKTNOSUPPORT"),
4171    (95, "ENOTSUP"),
4172    (96, "EPFNOSUPPORT"),
4173    (97, "EAFNOSUPPORT"),
4174    (98, "EADDRINUSE"),
4175    (99, "EADDRNOTAVAIL"),
4176    (100, "ENETDOWN"),
4177    (101, "ENETUNREACH"),
4178    (102, "ENETRESET"),
4179    (103, "ECONNABORTED"),
4180    (104, "ECONNRESET"),
4181    (105, "ENOBUFS"),
4182    (106, "EISCONN"),
4183    (107, "ENOTCONN"),
4184    (108, "ESHUTDOWN"),
4185    (109, "ETOOMANYREFS"),
4186    (110, "ETIMEDOUT"),
4187    (111, "ECONNREFUSED"),
4188    (112, "EHOSTDOWN"),
4189    (113, "EHOSTUNREACH"),
4190    (114, "EALREADY"),
4191    (115, "EINPROGRESS"),
4192    (116, "ESTALE"),
4193    (117, "EUCLEAN"),
4194    (118, "ENOTNAM"),
4195    (119, "ENAVAIL"),
4196    (120, "EISNAM"),
4197    (121, "EREMOTEIO"),
4198    (122, "EDQUOT"),
4199    (123, "ENOMEDIUM"),
4200    (124, "EMEDIUMTYPE"),
4201    (125, "ECANCELED"),
4202    (126, "ENOKEY"),
4203    (127, "EKEYEXPIRED"),
4204    (128, "EKEYREVOKED"),
4205    (129, "EKEYREJECTED"),
4206    (130, "EOWNERDEAD"),
4207    (131, "ENOTRECOVERABLE"),
4208    (132, "ERFKILL"),
4209    (133, "EHWPOISON"),
4210];
4211
4212fn errno_name(raw: &str) -> Option<String> {
4213    let errno = raw.parse::<u32>().ok()?;
4214    let name = ERRNO_NAMES
4215        .iter()
4216        .find_map(|(candidate, name)| (*candidate == errno).then_some(*name))?;
4217    Some(format!("{errno} ({name})"))
4218}
4219
4220fn cap_effective_display(raw: &str) -> String {
4221    if !raw.bytes().next().is_some_and(|byte| byte.is_ascii_digit()) {
4222        return raw.to_string();
4223    }
4224    let Ok(value) = u64::from_str_radix(raw, 16) else {
4225        return raw.to_string();
4226    };
4227    if value == 0 {
4228        return raw.to_string();
4229    }
4230    const CAPABILITIES: &[&str] = &[
4231        "CHOWN",
4232        "DAC_OVERRIDE",
4233        "DAC_READ_SEARCH",
4234        "FOWNER",
4235        "FSETID",
4236        "KILL",
4237        "SETGID",
4238        "SETUID",
4239        "SETPCAP",
4240        "LINUX_IMMUTABLE",
4241        "NET_BIND_SERVICE",
4242        "NET_BROADCAST",
4243        "NET_ADMIN",
4244        "NET_RAW",
4245        "IPC_LOCK",
4246        "IPC_OWNER",
4247        "SYS_MODULE",
4248        "SYS_RAWIO",
4249        "SYS_CHROOT",
4250        "SYS_PTRACE",
4251        "SYS_PACCT",
4252        "SYS_ADMIN",
4253        "SYS_BOOT",
4254        "SYS_NICE",
4255        "SYS_RESOURCE",
4256        "SYS_TIME",
4257        "SYS_TTY_CONFIG",
4258        "MKNOD",
4259        "LEASE",
4260        "AUDIT_WRITE",
4261        "AUDIT_CONTROL",
4262        "SETFCAP",
4263        "MAC_OVERRIDE",
4264        "MAC_ADMIN",
4265        "SYSLOG",
4266        "WAKE_ALARM",
4267        "BLOCK_SUSPEND",
4268        "AUDIT_READ",
4269        "PERFMON",
4270        "BPF",
4271        "CHECKPOINT_RESTORE",
4272    ];
4273    let names: Vec<&str> = CAPABILITIES
4274        .iter()
4275        .enumerate()
4276        .filter_map(|(index, name)| ((value & (1u64 << index)) != 0).then_some(*name))
4277        .collect();
4278    if names.is_empty() {
4279        raw.to_string()
4280    } else {
4281        format!("{raw} ({})", names.join(" | "))
4282    }
4283}
4284
4285fn systemd_field_display_value(
4286    context: &DisplayContext,
4287    scope: DisplayScope,
4288    field: &str,
4289    value: &[u8],
4290    resolve_user_group_names: bool,
4291) -> Value {
4292    let raw = String::from_utf8_lossy(value);
4293    match field {
4294        "PRIORITY" => Value::String(priority_name(&raw).unwrap_or(&raw).to_string()),
4295        "SYSLOG_FACILITY" => Value::String(syslog_facility_name(&raw).unwrap_or(&raw).to_string()),
4296        "ERRNO" => Value::String(errno_name(&raw).unwrap_or_else(|| raw.to_string())),
4297        "MESSAGE_ID" => Value::String(match (message_id_name(&raw), scope) {
4298            (Some(name), DisplayScope::Data) => format!("{raw} ({name})"),
4299            (Some(name), DisplayScope::Facet | DisplayScope::Histogram) => name.to_string(),
4300            (None, _) => raw.into_owned(),
4301        }),
4302        "_BOOT_ID" => Value::String(match (context.boot_first_realtime.get(value), scope) {
4303            (Some(timestamp), DisplayScope::Data) => format!(
4304                "{} ({})  ",
4305                raw,
4306                format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4307            ),
4308            (Some(timestamp), DisplayScope::Facet | DisplayScope::Histogram) => {
4309                format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4310            }
4311            (None, _) => raw.into_owned(),
4312        }),
4313        "_UID"
4314        | "_SYSTEMD_OWNER_UID"
4315        | "OBJECT_SYSTEMD_OWNER_UID"
4316        | "OBJECT_UID"
4317        | "_AUDIT_LOGINUID"
4318        | "OBJECT_AUDIT_LOGINUID" => {
4319            if resolve_user_group_names {
4320                Value::String(cached_uid_display(context, raw.as_ref()))
4321            } else {
4322                Value::String(raw.into_owned())
4323            }
4324        }
4325        "_GID" | "OBJECT_GID" => {
4326            if resolve_user_group_names {
4327                Value::String(cached_gid_display(context, raw.as_ref()))
4328            } else {
4329                Value::String(raw.into_owned())
4330            }
4331        }
4332        "_CAP_EFFECTIVE" => Value::String(cap_effective_display(&raw)),
4333        "_SOURCE_REALTIME_TIMESTAMP" => Value::String(match raw.parse::<u64>() {
4334            Ok(timestamp) if timestamp != 0 => {
4335                format!(
4336                    "{} ({})",
4337                    raw,
4338                    format_realtime_usec(timestamp, TimestampPrecision::Micros)
4339                )
4340            }
4341            _ => raw.into_owned(),
4342        }),
4343        _ => Value::String(raw.into_owned()),
4344    }
4345}
4346
4347fn cached_uid_display(context: &DisplayContext, raw: &str) -> String {
4348    if let Some(value) = context.uid_display_cache.borrow().get(raw) {
4349        return value.clone();
4350    }
4351    let value = resolve_uid_name(raw).unwrap_or_else(|| raw.to_string());
4352    context
4353        .uid_display_cache
4354        .borrow_mut()
4355        .insert(raw.to_string(), value.clone());
4356    value
4357}
4358
4359fn cached_gid_display(context: &DisplayContext, raw: &str) -> String {
4360    if let Some(value) = context.gid_display_cache.borrow().get(raw) {
4361        return value.clone();
4362    }
4363    let value = resolve_gid_name(raw).unwrap_or_else(|| raw.to_string());
4364    context
4365        .gid_display_cache
4366        .borrow_mut()
4367        .insert(raw.to_string(), value.clone());
4368    value
4369}
4370
4371#[cfg(unix)]
4372fn resolve_uid_name(raw: &str) -> Option<String> {
4373    let uid = raw.parse::<libc::uid_t>().ok()?;
4374    let mut pwd = std::mem::MaybeUninit::<libc::passwd>::uninit();
4375    let mut result = std::ptr::null_mut();
4376    let mut buffer = vec![0i8; 16_384];
4377    let rc = unsafe {
4378        libc::getpwuid_r(
4379            uid,
4380            pwd.as_mut_ptr(),
4381            buffer.as_mut_ptr(),
4382            buffer.len(),
4383            &mut result,
4384        )
4385    };
4386    if rc != 0 || result.is_null() {
4387        return None;
4388    }
4389    let pwd = unsafe { pwd.assume_init() };
4390    Some(
4391        unsafe { CStr::from_ptr(pwd.pw_name) }
4392            .to_string_lossy()
4393            .into_owned(),
4394    )
4395}
4396
4397#[cfg(not(unix))]
4398fn resolve_uid_name(_raw: &str) -> Option<String> {
4399    None
4400}
4401
4402#[cfg(unix)]
4403fn resolve_gid_name(raw: &str) -> Option<String> {
4404    let gid = raw.parse::<libc::gid_t>().ok()?;
4405    let mut grp = std::mem::MaybeUninit::<libc::group>::uninit();
4406    let mut result = std::ptr::null_mut();
4407    let mut buffer = vec![0i8; 16_384];
4408    let rc = unsafe {
4409        libc::getgrgid_r(
4410            gid,
4411            grp.as_mut_ptr(),
4412            buffer.as_mut_ptr(),
4413            buffer.len(),
4414            &mut result,
4415        )
4416    };
4417    if rc != 0 || result.is_null() {
4418        return None;
4419    }
4420    let grp = unsafe { grp.assume_init() };
4421    Some(
4422        unsafe { CStr::from_ptr(grp.gr_name) }
4423            .to_string_lossy()
4424            .into_owned(),
4425    )
4426}
4427
4428#[cfg(not(unix))]
4429fn resolve_gid_name(_raw: &str) -> Option<String> {
4430    None
4431}
4432
4433const MESSAGE_ID_NAMES: &[(&str, &str)] = &[
4434    ("f77379a8490b408bbe5f6940505a777b", "Journal started"),
4435    ("d93fb3c9c24d451a97cea615ce59c00b", "Journal stopped"),
4436    (
4437        "a596d6fe7bfa4994828e72309e95d61e",
4438        "Journal messages suppressed",
4439    ),
4440    (
4441        "e9bf28e6e834481bb6f48f548ad13606",
4442        "Journal messages missed",
4443    ),
4444    (
4445        "ec387f577b844b8fa948f33cad9a75e6",
4446        "Journal disk space usage",
4447    ),
4448    ("fc2e22bc6ee647b6b90729ab34a250b1", "Coredump"),
4449    ("5aadd8e954dc4b1a8c954d63fd9e1137", "Coredump truncated"),
4450    ("1f4e0a44a88649939aaea34fc6da8c95", "Backtrace"),
4451    ("8d45620c1a4348dbb17410da57c60c66", "User Session created"),
4452    (
4453        "3354939424b4456d9802ca8333ed424a",
4454        "User Session terminated",
4455    ),
4456    ("fcbefc5da23d428093f97c82a9290f7b", "Seat started"),
4457    ("e7852bfe46784ed0accde04bc864c2d5", "Seat removed"),
4458    (
4459        "24d8d4452573402496068381a6312df2",
4460        "VM or container started",
4461    ),
4462    (
4463        "58432bd3bace477cb514b56381b8a758",
4464        "VM or container stopped",
4465    ),
4466    ("c7a787079b354eaaa9e77b371893cd27", "Time change"),
4467    ("45f82f4aef7a4bbf942ce861d1f20990", "Timezone change"),
4468    (
4469        "50876a9db00f4c40bde1a2ad381c3a1b",
4470        "System configuration issues",
4471    ),
4472    (
4473        "b07a249cd024414a82dd00cd181378ff",
4474        "System start-up completed",
4475    ),
4476    (
4477        "eed00a68ffd84e31882105fd973abdd1",
4478        "User start-up completed",
4479    ),
4480    ("6bbd95ee977941e497c48be27c254128", "Sleep start"),
4481    ("8811e6df2a8e40f58a94cea26f8ebf14", "Sleep stop"),
4482    (
4483        "98268866d1d54a499c4e98921d93bc40",
4484        "System shutdown initiated",
4485    ),
4486    (
4487        "c14aaf76ec284a5fa1f105f88dfb061c",
4488        "System factory reset initiated",
4489    ),
4490    ("d9ec5e95e4b646aaaea2fd05214edbda", "Container init crashed"),
4491    (
4492        "3ed0163e868a4417ab8b9e210407a96c",
4493        "System reboot failed after crash",
4494    ),
4495    ("645c735537634ae0a32b15a7c6cba7d4", "Init execution froze"),
4496    (
4497        "5addb3a06a734d3396b794bf98fb2d01",
4498        "Init crashed no coredump",
4499    ),
4500    ("5c9e98de4ab94c6a9d04d0ad793bd903", "Init crashed no fork"),
4501    (
4502        "5e6f1f5e4db64a0eaee3368249d20b94",
4503        "Init crashed unknown signal",
4504    ),
4505    (
4506        "83f84b35ee264f74a3896a9717af34cb",
4507        "Init crashed systemd signal",
4508    ),
4509    (
4510        "3a73a98baf5b4b199929e3226c0be783",
4511        "Init crashed process signal",
4512    ),
4513    (
4514        "2ed18d4f78ca47f0a9bc25271c26adb4",
4515        "Init crashed waitpid failed",
4516    ),
4517    (
4518        "56b1cd96f24246c5b607666fda952356",
4519        "Init crashed coredump failed",
4520    ),
4521    ("4ac7566d4d7548f4981f629a28f0f829", "Init crashed coredump"),
4522    (
4523        "38e8b1e039ad469291b18b44c553a5b7",
4524        "Crash shell failed to fork",
4525    ),
4526    (
4527        "872729b47dbe473eb768ccecd477beda",
4528        "Crash shell failed to execute",
4529    ),
4530    ("658a67adc1c940b3b3316e7e8628834a", "Selinux failed"),
4531    ("e6f456bd92004d9580160b2207555186", "Battery low warning"),
4532    (
4533        "267437d33fdd41099ad76221cc24a335",
4534        "Battery low powering off",
4535    ),
4536    (
4537        "79e05b67bc4545d1922fe47107ee60c5",
4538        "Manager mainloop failed",
4539    ),
4540    ("dbb136b10ef4457ba47a795d62f108c9", "Manager no xdgdir path"),
4541    (
4542        "ed158c2df8884fa584eead2d902c1032",
4543        "Init failed to drop capability bounding set of usermode",
4544    ),
4545    (
4546        "42695b500df048298bee37159caa9f2e",
4547        "Init failed to drop capability bounding set",
4548    ),
4549    (
4550        "bfc2430724ab44499735b4f94cca9295",
4551        "User manager can't disable new privileges",
4552    ),
4553    (
4554        "59288af523be43a28d494e41e26e4510",
4555        "Manager failed to start default target",
4556    ),
4557    (
4558        "689b4fcc97b4486ea5da92db69c9e314",
4559        "Manager failed to isolate default target",
4560    ),
4561    (
4562        "5ed836f1766f4a8a9fc5da45aae23b29",
4563        "Manager failed to collect passed file descriptors",
4564    ),
4565    (
4566        "6a40fbfbd2ba4b8db02fb40c9cd090d7",
4567        "Init failed to fix up environment variables",
4568    ),
4569    (
4570        "0e54470984ac419689743d957a119e2e",
4571        "Manager failed to allocate",
4572    ),
4573    (
4574        "d67fa9f847aa4b048a2ae33535331adb",
4575        "Manager failed to write Smack",
4576    ),
4577    (
4578        "af55a6f75b544431b72649f36ff6d62c",
4579        "System shutdown critical error",
4580    ),
4581    (
4582        "d18e0339efb24a068d9c1060221048c2",
4583        "Init failed to fork off valgrind",
4584    ),
4585    ("7d4958e842da4a758f6c1cdc7b36dcc5", "Unit starting"),
4586    ("39f53479d3a045ac8e11786248231fbf", "Unit started"),
4587    ("be02cf6855d2428ba40df7e9d022f03d", "Unit failed"),
4588    ("de5b426a63be47a7b6ac3eaac82e2f6f", "Unit stopping"),
4589    ("9d1aaa27d60140bd96365438aad20286", "Unit stopped"),
4590    ("d34d037fff1847e6ae669a370e694725", "Unit reloading"),
4591    ("7b05ebc668384222baa8881179cfda54", "Unit reloaded"),
4592    ("5eb03494b6584870a536b337290809b3", "Unit restart scheduled"),
4593    ("ae8f7b866b0347b9af31fe1c80b127c0", "Unit resources"),
4594    ("7ad2d189f7e94e70a38c781354912448", "Unit success"),
4595    ("0e4284a0caca4bfc81c0bb6786972673", "Unit skipped"),
4596    ("d9b373ed55a64feb8242e02dbe79a49c", "Unit failure result"),
4597    (
4598        "641257651c1b4ec9a8624d7a40a9e1e7",
4599        "Process execution failed",
4600    ),
4601    ("98e322203f7a4ed290d09fe03c09fe15", "Unit process exited"),
4602    ("0027229ca0644181a76c4e92458afa2e", "Syslog forward missed"),
4603    (
4604        "1dee0369c7fc4736b7099b38ecb46ee7",
4605        "Mount point is not empty",
4606    ),
4607    ("d989611b15e44c9dbf31e3c81256e4ed", "Unit oomd kill"),
4608    ("fe6faa94e7774663a0da52717891d8ef", "Unit out of memory"),
4609    ("b72ea4a2881545a0b50e200e55b9b06f", "Lid opened"),
4610    ("b72ea4a2881545a0b50e200e55b9b070", "Lid closed"),
4611    ("f5f416b862074b28927a48c3ba7d51ff", "System docked"),
4612    ("51e171bd585248568110144c517cca53", "System undocked"),
4613    ("b72ea4a2881545a0b50e200e55b9b071", "Power key"),
4614    ("3e0117101eb243c1b9a50db3494ab10b", "Power key long press"),
4615    ("9fa9d2c012134ec385451ffe316f97d0", "Reboot key"),
4616    ("f1c59a58c9d943668965c337caec5975", "Reboot key long press"),
4617    ("b72ea4a2881545a0b50e200e55b9b072", "Suspend key"),
4618    ("bfdaf6d312ab4007bc1fe40a15df78e8", "Suspend key long press"),
4619    ("b72ea4a2881545a0b50e200e55b9b073", "Hibernate key"),
4620    (
4621        "167836df6f7f428e98147227b2dc8945",
4622        "Hibernate key long press",
4623    ),
4624    ("c772d24e9a884cbeb9ea12625c306c01", "Invalid configuration"),
4625    (
4626        "1675d7f172174098b1108bf8c7dc8f5d",
4627        "DNSSEC validation failed",
4628    ),
4629    (
4630        "4d4408cfd0d144859184d1e65d7c8a65",
4631        "DNSSEC trust anchor revoked",
4632    ),
4633    ("36db2dfa5a9045e1bd4af5f93e1cf057", "DNSSEC turned off"),
4634    ("b61fdac612e94b9182285b998843061f", "Username unsafe"),
4635    (
4636        "1b3bb94037f04bbf81028e135a12d293",
4637        "Mount point path not suitable",
4638    ),
4639    (
4640        "010190138f494e29a0ef6669749531aa",
4641        "Device path not suitable",
4642    ),
4643    ("b480325f9c394a7b802c231e51a2752c", "Nobody user unsuitable"),
4644    (
4645        "1c0454c1bd2241e0ac6fefb4bc631433",
4646        "Systemd udev settle deprecated",
4647    ),
4648    ("7c8a41f37b764941a0e1780b1be2f037", "Time initial sync"),
4649    ("7db73c8af0d94eeb822ae04323fe6ab6", "Time initial bump"),
4650    ("9e7066279dc8403da79ce4b1a69064b2", "Shutdown scheduled"),
4651    ("249f6fb9e6e2428c96f3f0875681ffa3", "Shutdown canceled"),
4652    ("3f7d5ef3e54f4302b4f0b143bb270cab", "TPM PCR Extended"),
4653    ("f9b0be465ad540d0850ad32172d57c21", "Memory Trimmed"),
4654    ("a8fa8dacdb1d443e9503b8be367a6adb", "SysV Service Found"),
4655    (
4656        "187c62eb1e7f463bb530394f52cb090f",
4657        "Portable Service attached",
4658    ),
4659    (
4660        "76c5c754d628490d8ecba4c9d042112b",
4661        "Portable Service detached",
4662    ),
4663    (
4664        "9cf56b8baf9546cf9478783a8de42113",
4665        "systemd-networkd sysctl changed by foreign process",
4666    ),
4667    (
4668        "ad7089f928ac4f7ea00c07457d47ba8a",
4669        "SRK into TPM authorization failure",
4670    ),
4671    (
4672        "b2bcbaf5edf948e093ce50bbea0e81ec",
4673        "Secure Attention Key (SAK) was pressed",
4674    ),
4675    ("7fc63312330b479bb32e598d47cef1a8", "dbus activate no unit"),
4676    (
4677        "ee9799dab1e24d81b7bee7759a543e1b",
4678        "dbus activate masked unit",
4679    ),
4680    ("a0fa58cafd6f4f0c8d003d16ccf9e797", "dbus broker exited"),
4681    ("c8c6cde1c488439aba371a664353d9d8", "dbus dirwatch"),
4682    ("8af3357071af4153af414daae07d38e7", "dbus dispatch stats"),
4683    ("199d4300277f495f84ba4028c984214c", "dbus no sopeergroup"),
4684    (
4685        "b209c0d9d1764ab38d13b8e00d1784d6",
4686        "dbus protocol violation",
4687    ),
4688    ("6fa70fa776044fa28be7a21daf42a108", "dbus receive failed"),
4689    (
4690        "0ce0fa61d1a9433dabd67417f6b8e535",
4691        "dbus service failed open",
4692    ),
4693    ("24dc708d9e6a4226a3efe2033bb744de", "dbus service invalid"),
4694    ("f15d2347662d483ea9bcd8aa1a691d28", "dbus sighup"),
4695    (
4696        "0ce153587afa4095832d233c17a88001",
4697        "Gnome SM startup succeeded",
4698    ),
4699    (
4700        "10dd2dc188b54a5e98970f56499d1f73",
4701        "Gnome SM unrecoverable failure",
4702    ),
4703    ("f3ea493c22934e26811cd62abe8e203a", "Gnome shell started"),
4704    ("c7b39b1e006b464599465e105b361485", "Flatpak cache"),
4705    ("75ba3deb0af041a9a46272ff85d9e73e", "Flathub pulls"),
4706    ("f02bce89a54e4efab3a94a797d26204a", "Flathub pull errors"),
4707    ("dd11929c788e48bdbb6276fb5f26b08a", "Boltd starting"),
4708    ("1e6061a9fbd44501b3ccc368119f2b69", "Netdata startup"),
4709    (
4710        "ed4cdb8f1beb4ad3b57cb3cae2d162fa",
4711        "Netdata connection from child",
4712    ),
4713    (
4714        "6e2e3839067648968b646045dbf28d66",
4715        "Netdata connection to parent",
4716    ),
4717    (
4718        "9ce0cb58ab8b44df82c4bf1ad9ee22de",
4719        "Netdata alert transition",
4720    ),
4721    (
4722        "6db0018e83e34320ae2a659d78019fb7",
4723        "Netdata alert notification",
4724    ),
4725    ("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"),
4726    (
4727        "8ddaf5ba33a74078b609250db1e951f3",
4728        "Sensor state transition",
4729    ),
4730    (
4731        "ec87a56120d5431bace51e2fb8bba243",
4732        "Netdata log flood protection",
4733    ),
4734    (
4735        "acb33cb95778476baac702eb7e4e151d",
4736        "Netdata Cloud connection",
4737    ),
4738    (
4739        "d1f59606dd4d41e3b217a0cfcae8e632",
4740        "Netdata extreme cardinality",
4741    ),
4742    ("02f47d350af5449197bf7a95b605a468", "Netdata exit reason"),
4743    (
4744        "4fdf40816c124623a032b7fe73beacb8",
4745        "Netdata dynamic configuration",
4746    ),
4747];
4748
4749fn message_id_name(raw: &str) -> Option<&'static str> {
4750    MESSAGE_ID_NAMES
4751        .iter()
4752        .find_map(|(candidate, name)| (*candidate == raw).then_some(*name))
4753}
4754
4755#[cfg(test)]
4756mod tests {
4757    use super::*;
4758    use crate::ExplorerHistogramBucket;
4759    use journal_core::file::{JournalFile, JournalFileOptions, JournalWriter, MmapMut};
4760    use journal_core::repository::File as RepoFile;
4761    use std::cell::Cell;
4762    use std::collections::HashMap;
4763    use tempfile::TempDir;
4764
4765    #[derive(Default)]
4766    struct TestNetdataState {
4767        metadata: HashMap<PathBuf, NetdataJournalFileMetadata>,
4768        updates: Vec<(PathBuf, u64)>,
4769    }
4770
4771    impl NetdataFunctionState for TestNetdataState {
4772        fn file_metadata(&self, path: &Path) -> Option<NetdataJournalFileMetadata> {
4773            self.metadata.get(path).cloned()
4774        }
4775
4776        fn update_file_journal_vs_realtime_delta_usec(&mut self, path: &Path, delta_usec: u64) {
4777            self.updates.push((path.to_path_buf(), delta_usec));
4778        }
4779    }
4780
4781    fn test_uuid(seed: u8) -> uuid::Uuid {
4782        uuid::Uuid::from_bytes([seed; 16])
4783    }
4784
4785    fn write_netdata_test_journal(directory: &std::path::Path, count: usize) {
4786        write_named_netdata_test_journal(
4787            directory,
4788            "netdata-api-test.journal",
4789            count,
4790            1_700_000_000_000_000,
4791        );
4792    }
4793
4794    fn write_named_netdata_test_journal(
4795        directory: &std::path::Path,
4796        name: &str,
4797        count: usize,
4798        start_realtime_usec: u64,
4799    ) {
4800        write_stepped_netdata_test_journal(directory, name, count, start_realtime_usec, 1);
4801    }
4802
4803    fn write_stepped_netdata_test_journal(
4804        directory: &std::path::Path,
4805        name: &str,
4806        count: usize,
4807        start_realtime_usec: u64,
4808        step_realtime_usec: u64,
4809    ) {
4810        std::fs::create_dir_all(directory).expect("create journal dir");
4811        let path = directory.join(name);
4812        let repo_file = RepoFile::from_path(&path).expect("repo file");
4813        let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
4814        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
4815        let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
4816        for index in 0..count {
4817            let message = format!("MESSAGE=row-{index}");
4818            let service = if index % 2 == 0 {
4819                b"SERVICE=even".as_slice()
4820            } else {
4821                b"SERVICE=odd".as_slice()
4822            };
4823            let payloads: [&[u8]; 3] = [message.as_bytes(), service, b"PRIORITY=6"];
4824            let realtime = start_realtime_usec
4825                .saturating_add((index as u64).saturating_mul(step_realtime_usec));
4826            writer
4827                .add_entry(&mut file, &payloads, realtime, realtime)
4828                .expect("write entry");
4829        }
4830        file.sync().expect("sync journal");
4831    }
4832
4833    struct NetdataCollectedPages {
4834        messages: Vec<String>,
4835        timestamps: Vec<u64>,
4836    }
4837
4838    fn collect_netdata_pages(
4839        directory: &std::path::Path,
4840        direction: Direction,
4841        page_size: usize,
4842    ) -> NetdataCollectedPages {
4843        let mut out = NetdataCollectedPages {
4844            messages: Vec::new(),
4845            timestamps: Vec::new(),
4846        };
4847        let mut anchor = None;
4848        for page in 0..100 {
4849            let mut request = json!({
4850                "after": 1_700_000_000,
4851                "before": 1_800_000_000,
4852                "last": page_size,
4853                "direction": direction_name(direction),
4854                "data_only": true,
4855            });
4856            if let Some(anchor) = anchor {
4857                request["anchor"] = Value::from(anchor);
4858            }
4859            let response = run_netdata_contract_request(directory, request);
4860            assert_eq!(
4861                response["status"], 200,
4862                "page {page} response = {response:#}"
4863            );
4864            let messages = response_column_strings(&response, "MESSAGE");
4865            let timestamps = response_column_u64s(&response, "timestamp");
4866            assert_eq!(messages.len(), timestamps.len());
4867            if messages.is_empty() {
4868                break;
4869            }
4870            out.messages.extend(messages);
4871            out.timestamps.extend(timestamps.iter().copied());
4872            anchor = Some(match direction {
4873                Direction::Forward => timestamps[0],
4874                Direction::Backward => *timestamps.last().expect("page timestamp"),
4875            });
4876            if timestamps.len() < page_size {
4877                break;
4878            }
4879        }
4880        assert!(!out.messages.is_empty(), "pagination returned no rows");
4881        out
4882    }
4883
4884    fn run_netdata_contract_request(directory: &std::path::Path, request: Value) -> Value {
4885        NetdataJournalFunction::systemd_journal_plugin_compatible()
4886            .run_directory_request_json_with_options(
4887                directory,
4888                &request,
4889                NetdataFunctionRunOptions::from_timeout_seconds(0),
4890            )
4891            .expect("run function")
4892    }
4893
4894    fn direction_name(direction: Direction) -> &'static str {
4895        match direction {
4896            Direction::Forward => "forward",
4897            Direction::Backward => "backward",
4898        }
4899    }
4900
4901    fn response_column_strings(response: &Value, field: &str) -> Vec<String> {
4902        let index = response_column_index(response, field);
4903        response["data"]
4904            .as_array()
4905            .expect("data")
4906            .iter()
4907            .map(|row| {
4908                row.as_array().expect("row")[index]
4909                    .as_str()
4910                    .expect("string cell")
4911                    .to_string()
4912            })
4913            .collect()
4914    }
4915
4916    fn response_column_u64s(response: &Value, field: &str) -> Vec<u64> {
4917        let index = response_column_index(response, field);
4918        response["data"]
4919            .as_array()
4920            .expect("data")
4921            .iter()
4922            .map(|row| {
4923                row.as_array().expect("row")[index]
4924                    .as_u64()
4925                    .expect("u64 cell")
4926            })
4927            .collect()
4928    }
4929
4930    fn response_column_index(response: &Value, field: &str) -> usize {
4931        response["columns"][field]["index"]
4932            .as_u64()
4933            .expect("column index") as usize
4934    }
4935
4936    fn response_facet_count(response: &Value, key: &str, field: &str, value: &str) -> u64 {
4937        for facet in response[key].as_array().expect("facets") {
4938            if facet["id"] != field {
4939                continue;
4940            }
4941            for option in facet["options"].as_array().expect("facet options") {
4942                if option["id"] == value {
4943                    return option["count"].as_u64().expect("facet count");
4944                }
4945            }
4946            panic!("{key} facet {field} missing value {value}");
4947        }
4948        panic!("{key} missing facet {field}");
4949    }
4950
4951    fn response_histogram_total(response: &Value, key: &str, value: &str) -> u64 {
4952        let chart = &response[key]["chart"];
4953        let names = chart["view"]["dimensions"]["names"]
4954            .as_array()
4955            .expect("histogram names");
4956        let dimension_index = names
4957            .iter()
4958            .position(|name| name.as_str() == Some(value))
4959            .expect("histogram dimension");
4960        chart["result"]["data"]
4961            .as_array()
4962            .expect("histogram data")
4963            .iter()
4964            .map(|point| {
4965                point.as_array().expect("histogram point")[dimension_index + 1]
4966                    .as_array()
4967                    .expect("histogram cell")[0]
4968                    .as_u64()
4969                    .unwrap_or_default()
4970            })
4971            .sum()
4972    }
4973
4974    fn assert_unique_messages(messages: &[String]) {
4975        let mut seen = HashSet::new();
4976        for message in messages {
4977            assert!(
4978                seen.insert(message),
4979                "duplicate message {message} in {messages:?}"
4980            );
4981        }
4982    }
4983
4984    #[test]
4985    fn parses_netdata_selections_as_and_fields_or_values() {
4986        let request = json!({
4987            "after": 200_000_000,
4988            "before": 200_000_100,
4989            "direction": "forward",
4990            "last": 25,
4991            "facets": ["PRIORITY"],
4992            "selections": {
4993                "PRIORITY": ["warning", "error"],
4994                "_HOSTNAME": ["node-a"],
4995                "__logs_sources": ["all-local-system-logs"],
4996            }
4997        });
4998
4999        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5000            .expect("parse request");
5001        assert_eq!(parsed.after_realtime_usec, Some(200_000_000_000_000));
5002        assert_eq!(parsed.before_realtime_usec, Some(200_000_100_999_999));
5003        assert_eq!(parsed.direction, Direction::Forward);
5004        assert_eq!(parsed.limit, 25);
5005        assert_eq!(parsed.filters.len(), 2);
5006        assert_eq!(parsed.filters[0].field, b"PRIORITY");
5007        assert_eq!(parsed.filters[0].values, vec![b"4".to_vec(), b"3".to_vec()]);
5008        assert_eq!(parsed.filters[1].field, b"_HOSTNAME");
5009        assert_eq!(parsed.filters[1].values, vec![b"node-a".to_vec()]);
5010    }
5011
5012    #[test]
5013    fn netdata_last_one_keeps_echo_and_uses_effective_minimum_two() {
5014        let request = json!({
5015            "after": 200_000_000,
5016            "before": 200_000_100,
5017            "last": 1
5018        });
5019
5020        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5021            .expect("parse request");
5022        let query =
5023            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5024
5025        assert_eq!(parsed.echo.get("last").and_then(Value::as_u64), Some(1));
5026        assert_eq!(parsed.limit, 2);
5027        assert_eq!(query.limit, 2);
5028    }
5029
5030    #[test]
5031    fn netdata_facet_counts_use_native_sliced_filter_semantics() {
5032        let request = json!({
5033            "after": 200_000_000,
5034            "before": 200_000_100,
5035            "facets": ["PRIORITY"],
5036            "selections": {
5037                "PRIORITY": ["warning"]
5038            }
5039        });
5040
5041        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5042            .expect("parse request");
5043        let query =
5044            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5045
5046        assert!(!query.exclude_facet_field_filters);
5047    }
5048
5049    #[test]
5050    fn netdata_multi_filter_facet_counts_exclude_same_field_filter() {
5051        let request = json!({
5052            "after": 200_000_000,
5053            "before": 200_000_100,
5054            "facets": ["PRIORITY", "_BOOT_ID"],
5055            "selections": {
5056                "PRIORITY": ["warning"],
5057                "_BOOT_ID": ["738043aea7b3417cbc3e9941ad26f769"]
5058            }
5059        });
5060
5061        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5062            .expect("parse request");
5063        let query =
5064            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5065
5066        assert!(query.exclude_facet_field_filters);
5067    }
5068
5069    #[test]
5070    fn parses_netdata_fts_query_like_simple_pattern() {
5071        let (terms, positives, negatives) =
5072            parse_fts_query_patterns(r"error|warning|!debug|escaped\|pipe|\!literal| a*B");
5073
5074        assert_eq!(
5075            positives,
5076            vec![
5077                b"error".to_vec(),
5078                b"warning".to_vec(),
5079                b"escaped|pipe".to_vec(),
5080                b"!literal".to_vec(),
5081                b" a*B".to_vec(),
5082            ]
5083        );
5084        assert_eq!(negatives, vec![b"debug".to_vec()]);
5085        assert_eq!(terms.len(), 6);
5086        assert!(!terms[0].negative);
5087        assert!(terms[2].negative);
5088        assert_eq!(
5089            terms[5],
5090            ExplorerFtsPattern {
5091                parts: vec![b" a".to_vec(), b"B".to_vec()],
5092                negative: false,
5093            }
5094        );
5095
5096        let request = json!({
5097            "query": r"alpha|!debug|needle\|pipe",
5098            "facets": ["PRIORITY"],
5099        });
5100        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5101            .expect("parse request");
5102        let query =
5103            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5104        assert_eq!(
5105            query.fts_patterns,
5106            vec![b"alpha".to_vec(), b"needle|pipe".to_vec()]
5107        );
5108        assert_eq!(query.fts_negative_patterns, vec![b"debug".to_vec()]);
5109        assert_eq!(query.fts_terms.len(), 3);
5110    }
5111
5112    #[test]
5113    fn netdata_requests_never_enable_debug_row_traversal_column_collection() {
5114        let request = json!({
5115            "facets": ["PRIORITY", "_HOSTNAME"],
5116            "histogram": "PRIORITY",
5117            "last": 25
5118        });
5119
5120        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5121            .expect("parse request");
5122        let query =
5123            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5124
5125        assert!(!query.debug_collect_column_fields_by_row_traversal);
5126    }
5127
5128    #[test]
5129    fn netdata_function_suppresses_absent_requested_facet_groups() {
5130        let dir = TempDir::new().expect("tempdir");
5131        write_netdata_test_journal(dir.path(), 10);
5132        let request = json!({
5133            "after": 1_700_000_000,
5134            "before": 1_700_000_010,
5135            "facets": ["SERVICE", "MISSING_FIELD"],
5136            "histogram": "SERVICE",
5137            "last": 5,
5138            "slice": true
5139        });
5140        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5141
5142        let response = function
5143            .run_directory_request_json_with_options(
5144                dir.path(),
5145                &request,
5146                NetdataFunctionRunOptions::from_timeout_seconds(0),
5147            )
5148            .expect("run function");
5149
5150        let columns = response["columns"].as_object().expect("columns");
5151        assert!(columns.contains_key("SERVICE"));
5152        assert!(!columns.contains_key("MISSING_FIELD"));
5153        let facets = response["facets"].as_array().expect("facets");
5154        assert_eq!(
5155            facets
5156                .iter()
5157                .filter_map(|facet| facet["id"].as_str())
5158                .collect::<Vec<_>>(),
5159            vec!["SERVICE"]
5160        );
5161        let accepted = response["accepted_params"]
5162            .as_array()
5163            .expect("accepted params");
5164        assert!(accepted.iter().any(|value| value == "SERVICE"));
5165        assert!(!accepted.iter().any(|value| value == "MISSING_FIELD"));
5166        let histograms = response["available_histograms"]
5167            .as_array()
5168            .expect("available histograms");
5169        assert!(histograms.iter().any(|value| value["id"] == "SERVICE"));
5170        assert!(
5171            !histograms
5172                .iter()
5173                .any(|value| value["id"] == "MISSING_FIELD")
5174        );
5175    }
5176
5177    #[test]
5178    fn netdata_function_reports_zero_count_existing_facets_for_empty_results() {
5179        let dir = TempDir::new().expect("tempdir");
5180        write_netdata_test_journal(dir.path(), 10);
5181        let request = json!({
5182            "after": 1_700_000_000,
5183            "before": 1_700_000_010,
5184            "facets": ["PRIORITY"],
5185            "histogram": "PRIORITY",
5186            "selections": {
5187                "SERVICE": ["missing"]
5188            },
5189            "last": 5,
5190            "slice": true
5191        });
5192        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5193
5194        let response = function
5195            .run_directory_request_json_with_options(
5196                dir.path(),
5197                &request,
5198                NetdataFunctionRunOptions::from_timeout_seconds(0),
5199            )
5200            .expect("run function");
5201
5202        let facets = response["facets"].as_array().expect("facets");
5203        assert_eq!(facets.len(), 1);
5204        assert_eq!(facets[0]["id"], "PRIORITY");
5205        let options = facets[0]["options"].as_array().expect("options");
5206        assert!(options.iter().any(|option| {
5207            option["id"] == "6" && option["name"] == "info" && option["count"] == 0
5208        }));
5209        assert_eq!(response["items"]["matched"], 0);
5210    }
5211
5212    #[test]
5213    fn netdata_function_api_reports_progress() {
5214        let dir = TempDir::new().expect("tempdir");
5215        write_netdata_test_journal(dir.path(), 9_000);
5216        let request = json!({
5217            "after": 1_700_000_000,
5218            "before": 1_700_000_010,
5219            "facets": ["SERVICE"],
5220            "histogram": "SERVICE",
5221            "last": 0
5222        });
5223        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5224        let mut reports = 0u64;
5225        let mut progress = |progress: NetdataFunctionProgress| {
5226            reports = reports.saturating_add(1);
5227            assert_eq!(progress.current_file, 1);
5228            assert_eq!(progress.total_files, 1);
5229            assert!(progress.stats.rows_examined <= 9_000);
5230        };
5231        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5232        options.progress_interval = Duration::ZERO;
5233        options.progress_callback = Some(&mut progress);
5234
5235        let response = function
5236            .run_directory_request_json_with_options(dir.path(), &request, options)
5237            .expect("run function");
5238
5239        assert_eq!(response["status"], 200);
5240        assert!(reports > 0);
5241        assert_eq!(response["last_modified"], 1_700_000_000_008_999u64);
5242    }
5243
5244    #[test]
5245    fn netdata_function_api_reports_file_end_progress_for_small_scans() {
5246        let dir = TempDir::new().expect("tempdir");
5247        write_netdata_test_journal(dir.path(), 10);
5248        let request = json!({
5249            "after": 1_700_000_000,
5250            "before": 1_700_000_010,
5251            "facets": ["SERVICE"],
5252            "histogram": "SERVICE",
5253            "last": 0
5254        });
5255        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5256        let mut reports = 0u64;
5257        let mut last_rows_examined = 0u64;
5258        let mut progress = |progress: NetdataFunctionProgress| {
5259            reports = reports.saturating_add(1);
5260            last_rows_examined = progress.stats.rows_examined;
5261        };
5262        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5263        options.progress_callback = Some(&mut progress);
5264
5265        let response = function
5266            .run_directory_request_json_with_options(dir.path(), &request, options)
5267            .expect("run function");
5268
5269        assert_eq!(response["status"], 200);
5270        assert_eq!(reports, 1);
5271        assert_eq!(last_rows_examined, 10);
5272    }
5273
5274    #[test]
5275    fn netdata_function_progress_counts_only_query_files() {
5276        let dir = TempDir::new().expect("tempdir");
5277        write_named_netdata_test_journal(
5278            dir.path(),
5279            "old-window.journal",
5280            10,
5281            1_600_000_000_000_000,
5282        );
5283        write_named_netdata_test_journal(
5284            dir.path(),
5285            "current-window.journal",
5286            10,
5287            1_700_000_000_000_000,
5288        );
5289        let request = json!({
5290            "after": 1_700_000_000,
5291            "before": 1_700_000_010,
5292            "facets": ["SERVICE"],
5293            "histogram": "SERVICE",
5294            "last": 0
5295        });
5296        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5297        let mut reports = Vec::new();
5298        let mut progress = |progress: NetdataFunctionProgress| {
5299            reports.push((progress.current_file, progress.total_files));
5300        };
5301        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5302        options.progress_callback = Some(&mut progress);
5303
5304        let response = function
5305            .run_directory_request_json_with_options(dir.path(), &request, options)
5306            .expect("run function");
5307
5308        assert_eq!(response["status"], 200);
5309        assert_eq!(response["_journal_files"]["matched"], 1);
5310        assert_eq!(reports, vec![(1, 1)]);
5311    }
5312
5313    #[test]
5314    fn netdata_function_api_reports_cancellation() {
5315        let dir = TempDir::new().expect("tempdir");
5316        write_netdata_test_journal(dir.path(), 9_000);
5317        let request = json!({
5318            "after": 1_700_000_000,
5319            "before": 1_700_000_010,
5320            "facets": ["SERVICE"],
5321            "histogram": "SERVICE",
5322            "last": 0
5323        });
5324        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5325        let is_cancelled = || true;
5326        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5327        options.cancellation_callback = Some(&is_cancelled);
5328
5329        let response = function
5330            .run_directory_request_json_with_options(dir.path(), &request, options)
5331            .expect("run function");
5332
5333        assert_eq!(response["status"], 499);
5334        assert_eq!(response["errorMessage"], "Request cancelled.");
5335        assert_eq!(
5336            response.as_object().expect("response object").len(),
5337            2,
5338            "plugin-compatible function errors only include status and errorMessage"
5339        );
5340    }
5341
5342    #[test]
5343    fn netdata_function_api_cancels_during_active_scan() {
5344        let dir = TempDir::new().expect("tempdir");
5345        write_netdata_test_journal(dir.path(), 9_000);
5346        let request = json!({
5347            "after": 1_700_000_000,
5348            "before": 1_700_000_010,
5349            "facets": ["SERVICE"],
5350            "histogram": "SERVICE",
5351            "last": 0
5352        });
5353        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5354        let should_cancel = Cell::new(false);
5355        let mut reports = 0u64;
5356        let mut progress = |progress: NetdataFunctionProgress| {
5357            reports = reports.saturating_add(1);
5358            if progress.stats.rows_examined > 0 {
5359                should_cancel.set(true);
5360            }
5361        };
5362        let is_cancelled = || should_cancel.get();
5363        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5364        options.progress_interval = Duration::ZERO;
5365        options.progress_callback = Some(&mut progress);
5366        options.cancellation_callback = Some(&is_cancelled);
5367
5368        let response = function
5369            .run_directory_request_json_with_options(dir.path(), &request, options)
5370            .expect("run function");
5371
5372        assert_eq!(response["status"], 499);
5373        assert_eq!(response["errorMessage"], "Request cancelled.");
5374        assert!(reports > 0);
5375        assert!(should_cancel.get());
5376    }
5377
5378    #[test]
5379    fn netdata_function_api_honors_cancellation_after_final_file_progress() {
5380        let dir = TempDir::new().expect("tempdir");
5381        write_netdata_test_journal(dir.path(), 10);
5382        let request = json!({
5383            "after": 1_700_000_000,
5384            "before": 1_700_000_010,
5385            "facets": ["SERVICE"],
5386            "histogram": "SERVICE",
5387            "last": 0
5388        });
5389        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5390        let should_cancel = Cell::new(false);
5391        let mut progress = |_progress: NetdataFunctionProgress| {
5392            should_cancel.set(true);
5393        };
5394        let is_cancelled = || should_cancel.get();
5395        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5396        options.progress_callback = Some(&mut progress);
5397        options.cancellation_callback = Some(&is_cancelled);
5398
5399        let response = function
5400            .run_directory_request_json_with_options(dir.path(), &request, options)
5401            .expect("run function");
5402
5403        assert_eq!(response["status"], 499);
5404        assert_eq!(response["errorMessage"], "Request cancelled.");
5405        assert!(should_cancel.get());
5406    }
5407
5408    #[test]
5409    fn netdata_function_api_reports_timeout_as_partial_table() {
5410        let dir = TempDir::new().expect("tempdir");
5411        write_netdata_test_journal(dir.path(), 10);
5412        let request = json!({
5413            "after": 1_700_000_000,
5414            "before": 1_700_000_010,
5415            "facets": ["SERVICE"],
5416            "histogram": "SERVICE",
5417            "last": 0
5418        });
5419        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5420        let options = NetdataFunctionRunOptions {
5421            timeout: Some(Duration::ZERO),
5422            ..NetdataFunctionRunOptions::default()
5423        };
5424
5425        let response = function
5426            .run_directory_request_json_with_options(dir.path(), &request, options)
5427            .expect("run function");
5428
5429        assert_eq!(response["status"], 200);
5430        assert_eq!(response["partial"], true);
5431        assert_eq!(response["message"]["status"], "warning");
5432        assert_eq!(
5433            response["message"]["title"],
5434            "Query timed-out, incomplete data. "
5435        );
5436    }
5437
5438    #[test]
5439    fn netdata_function_api_reports_sampling_counters() {
5440        let dir = TempDir::new().expect("tempdir");
5441        write_stepped_netdata_test_journal(
5442            dir.path(),
5443            "netdata-api-test.journal",
5444            5_000,
5445            1_700_000_000_000_000,
5446            1_000,
5447        );
5448        let request = json!({
5449            "after": 1_700_000_000,
5450            "before": 1_700_000_005,
5451            "facets": ["SERVICE"],
5452            "histogram": "SERVICE",
5453            "last": 5,
5454            "sampling": 20
5455        });
5456        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5457
5458        let response = function
5459            .run_directory_request_json_with_options(
5460                dir.path(),
5461                &request,
5462                NetdataFunctionRunOptions::from_timeout_seconds(0),
5463            )
5464            .expect("run function");
5465
5466        assert_eq!(response["status"], 200);
5467        assert!(
5468            response["_sampling"]["sampled"]
5469                .as_u64()
5470                .unwrap_or_default()
5471                > 0
5472        );
5473        assert!(
5474            response["_sampling"]["unsampled"]
5475                .as_u64()
5476                .unwrap_or_default()
5477                > 0
5478        );
5479        assert!(
5480            response["_sampling"]["estimated"]
5481                .as_u64()
5482                .unwrap_or_default()
5483                > 0
5484        );
5485        assert_eq!(
5486            response["items"]["estimated"],
5487            response["_sampling"]["estimated"]
5488        );
5489        assert!(
5490            response["items"]["unsampled"].as_u64().unwrap_or_default()
5491                < response["_sampling"]["unsampled"]
5492                    .as_u64()
5493                    .unwrap_or_default()
5494        );
5495        assert_eq!(response["message"]["status"], "notice");
5496    }
5497
5498    #[test]
5499    fn netdata_function_api_disables_sampling_for_data_only() {
5500        let dir = TempDir::new().expect("tempdir");
5501        write_netdata_test_journal(dir.path(), 5_000);
5502        let request = json!({
5503            "after": 1_700_000_000,
5504            "before": 1_700_000_010,
5505            "data_only": true,
5506            "last": 5,
5507            "sampling": 20
5508        });
5509        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5510
5511        let response = function
5512            .run_directory_request_json_with_options(
5513                dir.path(),
5514                &request,
5515                NetdataFunctionRunOptions::from_timeout_seconds(0),
5516            )
5517            .expect("run function");
5518
5519        assert_eq!(response["status"], 200);
5520        assert!(response.get("_sampling").is_none());
5521    }
5522
5523    #[test]
5524    fn normalizes_missing_time_window_to_last_hour_like_plugin() {
5525        assert_eq!(
5526            normalize_time_window(1_000_000_000, None, None),
5527            (Some(999_996_400_000_000), Some(1_000_000_000_999_999))
5528        );
5529    }
5530
5531    #[test]
5532    fn normalizes_inverted_time_window_like_plugin() {
5533        assert_eq!(
5534            normalize_time_window(1_000_000_000, Some(200_000_100), Some(200_000_000)),
5535            (Some(200_000_000_000_000), Some(200_000_100_999_999))
5536        );
5537    }
5538
5539    #[test]
5540    fn normalizes_equal_time_window_like_plugin() {
5541        assert_eq!(
5542            normalize_time_window(1_000_000_000, Some(200_000_000), Some(200_000_000)),
5543            (Some(199_996_400_000_000), Some(200_000_000_999_999))
5544        );
5545    }
5546
5547    #[test]
5548    fn normalizes_relative_time_window_like_plugin() {
5549        assert_eq!(
5550            normalize_time_window(1_000_000_000, Some(100), Some(200)),
5551            (Some(999_999_701_000_000), Some(999_999_800_999_999))
5552        );
5553    }
5554
5555    #[test]
5556    fn normalizes_missing_after_with_supplied_before_like_plugin() {
5557        assert_eq!(
5558            normalize_time_window(1_000_000_000, None, Some(200_000_000)),
5559            (Some(199_999_401_000_000), Some(200_000_000_999_999))
5560        );
5561    }
5562
5563    #[test]
5564    fn systemd_profile_transforms_priority_and_facility_for_display() {
5565        let profile = SystemdJournalProfile;
5566        let context = DisplayContext::default();
5567        assert_eq!(
5568            profile.field_display_value(&context, DisplayScope::Data, "PRIORITY", b"7"),
5569            json!("debug")
5570        );
5571        assert_eq!(
5572            profile.field_display_value(&context, DisplayScope::Data, "SYSLOG_FACILITY", b"3"),
5573            json!("daemon")
5574        );
5575        assert_eq!(priority_to_row_severity(b"3"), "critical");
5576        assert_eq!(priority_to_row_severity(b"6"), "normal");
5577    }
5578
5579    #[test]
5580    fn dynamic_process_name_matches_plugin_fallback_order() {
5581        let mut fields = BTreeMap::new();
5582        fields.insert("SYSLOG_IDENTIFIER".to_string(), vec![b"syslog".to_vec()]);
5583        fields.insert("_COMM".to_string(), vec![b"comm".to_vec()]);
5584        fields.insert("_PID".to_string(), vec![b"42".to_vec()]);
5585        fields.insert("SYSLOG_PID".to_string(), vec![b"99".to_vec()]);
5586        assert_eq!(dynamic_process_name(&fields), "syslog[42]");
5587
5588        fields.insert("CONTAINER_NAME".to_string(), vec![b"container".to_vec()]);
5589        assert_eq!(dynamic_process_name(&fields), "container[42]");
5590
5591        fields.remove("CONTAINER_NAME");
5592        fields.remove("SYSLOG_IDENTIFIER");
5593        fields.remove("_PID");
5594        assert_eq!(dynamic_process_name(&fields), "comm");
5595
5596        fields.remove("_COMM");
5597        fields.insert("_EXE".to_string(), vec![b"/usr/bin/app".to_vec()]);
5598        assert_eq!(dynamic_process_name(&fields), "-");
5599    }
5600
5601    #[test]
5602    fn facet_values_are_truncated_and_collapsed_like_plugin() {
5603        let prefix = vec![b'a'; NETDATA_FACET_MAX_VALUE_LENGTH];
5604        let mut first = prefix.clone();
5605        first.extend_from_slice(b"-first");
5606        let mut second = prefix.clone();
5607        second.extend_from_slice(b"-second");
5608
5609        let mut values = BTreeMap::new();
5610        add_netdata_facet_count(&mut values, &first, 2);
5611        add_netdata_facet_count(&mut values, &second, 3);
5612
5613        assert_eq!(values.len(), 1);
5614        assert_eq!(values.get(&prefix), Some(&5));
5615    }
5616
5617    #[test]
5618    fn histogram_values_are_truncated_and_collapsed_like_plugin() {
5619        let prefix = vec![b'b'; NETDATA_FACET_MAX_VALUE_LENGTH];
5620        let mut first = prefix.clone();
5621        first.extend_from_slice(b"-first");
5622        let mut second = prefix.clone();
5623        second.extend_from_slice(b"-second");
5624
5625        let mut values = HashMap::new();
5626        values.insert(first, 2);
5627        values.insert(second, 3);
5628        let histogram = ExplorerHistogram {
5629            field: b"TEST_FIELD".to_vec(),
5630            buckets: vec![ExplorerHistogramBucket {
5631                start_realtime_usec: 1_000_000,
5632                end_realtime_usec: 2_000_000,
5633                values,
5634            }],
5635        };
5636
5637        let function = NetdataJournalFunction::systemd_journal();
5638        let rendered = function.build_histogram(&DisplayContext::default(), &histogram, None);
5639        let labels = rendered["chart"]["result"]["labels"]
5640            .as_array()
5641            .expect("labels");
5642        assert_eq!(labels.len(), 2);
5643        assert_eq!(labels[1], Value::String(String::from_utf8(prefix).unwrap()));
5644        assert_eq!(rendered["chart"]["result"]["data"][0][1][0], json!(5));
5645    }
5646
5647    #[test]
5648    fn histogram_chart_metadata_includes_empty_dimension_arrays() {
5649        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5650        let empty = function.build_histogram(
5651            &DisplayContext::default(),
5652            &ExplorerHistogram {
5653                field: b"TRAP_SEVERITY".to_vec(),
5654                buckets: vec![ExplorerHistogramBucket {
5655                    start_realtime_usec: 1_700_000_000_000_000,
5656                    end_realtime_usec: 1_700_000_005_000_000,
5657                    values: HashMap::new(),
5658                }],
5659            },
5660            None,
5661        );
5662
5663        assert_eq!(empty["chart"]["view"]["dimensions"]["names"], json!([]));
5664        assert_eq!(empty["chart"]["view"]["dimensions"]["ids"], json!([]));
5665        assert_eq!(empty["chart"]["db"]["dimensions"]["names"], json!([]));
5666
5667        let mut values = HashMap::new();
5668        values.insert(b"warning".to_vec(), 7);
5669        let with_value = function.build_histogram(
5670            &DisplayContext::default(),
5671            &ExplorerHistogram {
5672                field: b"TRAP_SEVERITY".to_vec(),
5673                buckets: vec![ExplorerHistogramBucket {
5674                    start_realtime_usec: 1_700_000_000_000_000,
5675                    end_realtime_usec: 1_700_000_005_000_000,
5676                    values,
5677                }],
5678            },
5679            None,
5680        );
5681
5682        assert_eq!(
5683            with_value["chart"]["view"]["dimensions"]["names"],
5684            json!(["warning"])
5685        );
5686        assert_eq!(
5687            with_value["chart"]["view"]["dimensions"]["sts"]["min"],
5688            json!([7])
5689        );
5690    }
5691
5692    #[test]
5693    fn duplicate_row_timestamps_match_plugin_direction_adjustment() {
5694        let mut backward = vec![
5695            test_located_row(100),
5696            test_located_row(100),
5697            test_located_row(100),
5698            test_located_row(90),
5699        ];
5700        make_row_timestamps_unique(&mut backward, Direction::Backward);
5701        assert_eq!(
5702            backward
5703                .iter()
5704                .map(|row| row.row.realtime_usec)
5705                .collect::<Vec<_>>(),
5706            vec![100, 99, 98, 90]
5707        );
5708
5709        let mut forward = vec![
5710            test_located_row(90),
5711            test_located_row(100),
5712            test_located_row(100),
5713            test_located_row(100),
5714        ];
5715        make_row_timestamps_unique(&mut forward, Direction::Forward);
5716        assert_eq!(
5717            forward
5718                .iter()
5719                .map(|row| row.row.realtime_usec)
5720                .collect::<Vec<_>>(),
5721            vec![90, 100, 101, 102]
5722        );
5723    }
5724
5725    #[test]
5726    fn page_window_counts_forward_anchor_like_netdata_facets() {
5727        let config = NetdataFunctionConfig::systemd_journal();
5728        let request = NetdataRequest::parse(
5729            &json!({
5730                "after": 1_700_000_000,
5731                "before": 1_700_000_010,
5732                "anchor": 1_700_000_005_000_000u64,
5733                "direction": "forward",
5734                "last": 2
5735            }),
5736            &config,
5737        )
5738        .expect("parse request");
5739        let mut window = NetdataPageWindow::for_request(&request);
5740
5741        for realtime_usec in [
5742            1_700_000_003_000_000,
5743            1_700_000_004_000_000,
5744            1_700_000_006_000_000,
5745            1_700_000_007_000_000,
5746            1_700_000_008_000_000,
5747        ] {
5748            window.observe(realtime_usec);
5749        }
5750
5751        let counters = window.counters();
5752        assert_eq!(counters.matched, 3);
5753        assert_eq!(counters.before, 1);
5754        assert_eq!(counters.after, 2);
5755    }
5756
5757    #[test]
5758    fn page_window_counts_backward_anchor_like_netdata_facets() {
5759        let config = NetdataFunctionConfig::systemd_journal();
5760        let request = NetdataRequest::parse(
5761            &json!({
5762                "after": 1_700_000_000,
5763                "before": 1_700_000_010,
5764                "anchor": 1_700_000_008_000_000u64,
5765                "direction": "backward",
5766                "last": 2
5767            }),
5768            &config,
5769        )
5770        .expect("parse request");
5771        let mut window = NetdataPageWindow::for_request(&request);
5772
5773        for realtime_usec in [
5774            1_700_000_009_000_000,
5775            1_700_000_007_000_000,
5776            1_700_000_006_000_000,
5777            1_700_000_005_000_000,
5778        ] {
5779            window.observe(realtime_usec);
5780        }
5781
5782        let counters = window.counters();
5783        assert_eq!(counters.matched, 3);
5784        assert_eq!(counters.before, 1);
5785        assert_eq!(counters.after, 1);
5786    }
5787
5788    #[test]
5789    fn page_window_counts_tail_anchor_like_netdata_facets() {
5790        let config = NetdataFunctionConfig::systemd_journal();
5791        let request = NetdataRequest::parse(
5792            &json!({
5793                "after": 1_700_000_000,
5794                "before": 1_700_000_010,
5795                "anchor": 1_700_000_008_000_000u64,
5796                "if_modified_since": 1_700_000_008_000_000u64,
5797                "data_only": true,
5798                "tail": true,
5799                "last": 2
5800            }),
5801            &config,
5802        )
5803        .expect("parse request");
5804        let mut window = NetdataPageWindow::for_request(&request);
5805
5806        for realtime_usec in [
5807            1_700_000_009_000_000,
5808            1_700_000_008_000_000,
5809            1_700_000_007_000_000,
5810        ] {
5811            window.observe(realtime_usec);
5812        }
5813
5814        let counters = window.counters();
5815        assert_eq!(counters.matched, 1);
5816        assert_eq!(counters.before, 0);
5817        assert_eq!(counters.after, 2);
5818    }
5819
5820    #[test]
5821    fn netdata_function_tail_anchor_with_no_new_filtered_rows_returns_304() {
5822        let dir = TempDir::new().expect("tempdir");
5823        let start_realtime_usec = 1_700_000_000_000_000;
5824        write_stepped_netdata_test_journal(
5825            dir.path(),
5826            "netdata-api-test.journal",
5827            2,
5828            start_realtime_usec,
5829            1_000_000,
5830        );
5831        let request = json!({
5832            "after": 1_700_000_000,
5833            "before": 1_700_000_002,
5834            "anchor": start_realtime_usec,
5835            "if_modified_since": start_realtime_usec,
5836            "data_only": true,
5837            "tail": true,
5838            "last": 5,
5839            "selections": {
5840                "SERVICE": ["even"]
5841            }
5842        });
5843        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5844
5845        let response = function
5846            .run_directory_request_json_with_options(
5847                dir.path(),
5848                &request,
5849                NetdataFunctionRunOptions::from_timeout_seconds(0),
5850            )
5851            .expect("run function");
5852
5853        assert_eq!(response["status"], 304);
5854        assert_eq!(
5855            response["errorMessage"],
5856            "No new data since the previous call."
5857        );
5858    }
5859
5860    #[test]
5861    fn netdata_function_pages_with_anchor_without_duplicate_or_missing_rows() {
5862        let dir = TempDir::new().expect("tempdir");
5863        let start_realtime_usec = 1_700_000_000_000_000;
5864        write_stepped_netdata_test_journal(
5865            dir.path(),
5866            "paging-contract.journal",
5867            7,
5868            start_realtime_usec,
5869            1,
5870        );
5871
5872        let backward = collect_netdata_pages(dir.path(), Direction::Backward, 2);
5873        assert_eq!(
5874            backward.messages,
5875            vec![
5876                "row-6", "row-5", "row-4", "row-3", "row-2", "row-1", "row-0"
5877            ]
5878        );
5879        assert_unique_messages(&backward.messages);
5880        assert_eq!(
5881            backward.timestamps,
5882            vec![
5883                start_realtime_usec + 6,
5884                start_realtime_usec + 5,
5885                start_realtime_usec + 4,
5886                start_realtime_usec + 3,
5887                start_realtime_usec + 2,
5888                start_realtime_usec + 1,
5889                start_realtime_usec,
5890            ]
5891        );
5892
5893        let forward = collect_netdata_pages(dir.path(), Direction::Forward, 2);
5894        assert_eq!(
5895            forward.messages,
5896            vec![
5897                "row-1", "row-0", "row-3", "row-2", "row-5", "row-4", "row-6"
5898            ]
5899        );
5900        assert_unique_messages(&forward.messages);
5901        assert_eq!(
5902            forward.timestamps,
5903            vec![
5904                start_realtime_usec + 1,
5905                start_realtime_usec,
5906                start_realtime_usec + 3,
5907                start_realtime_usec + 2,
5908                start_realtime_usec + 5,
5909                start_realtime_usec + 4,
5910                start_realtime_usec + 6,
5911            ]
5912        );
5913    }
5914
5915    #[test]
5916    fn netdata_function_tail_polls_return_only_rows_after_anchor_then_304() {
5917        let dir = TempDir::new().expect("tempdir");
5918        let start_realtime_usec = 1_700_000_000_000_000;
5919        write_stepped_netdata_test_journal(
5920            dir.path(),
5921            "tail-contract.journal",
5922            5,
5923            start_realtime_usec,
5924            1,
5925        );
5926        let anchor = start_realtime_usec + 2;
5927
5928        for requested_direction in [Direction::Backward, Direction::Forward] {
5929            let response = run_netdata_contract_request(
5930                dir.path(),
5931                json!({
5932                    "after": 1_700_000_000,
5933                    "before": 1_700_000_010,
5934                    "last": 5,
5935                    "direction": direction_name(requested_direction),
5936                    "data_only": true,
5937                    "tail": true,
5938                    "if_modified_since": anchor,
5939                    "anchor": anchor,
5940                }),
5941            );
5942            assert_eq!(response["status"], 200);
5943            assert_eq!(response["_request"]["direction"], "backward");
5944            assert_eq!(
5945                response_column_strings(&response, "MESSAGE"),
5946                vec!["row-4", "row-3"]
5947            );
5948            assert_eq!(
5949                response_column_u64s(&response, "timestamp"),
5950                vec![start_realtime_usec + 4, start_realtime_usec + 3]
5951            );
5952        }
5953
5954        let no_change = run_netdata_contract_request(
5955            dir.path(),
5956            json!({
5957                "after": 1_700_000_000,
5958                "before": 1_700_000_010,
5959                "last": 5,
5960                "direction": "backward",
5961                "data_only": true,
5962                "tail": true,
5963                "if_modified_since": start_realtime_usec + 4,
5964                "anchor": start_realtime_usec + 4,
5965            }),
5966        );
5967        assert_eq!(no_change["status"], 304);
5968        assert_eq!(
5969            no_change["errorMessage"],
5970            "No new data since the previous call."
5971        );
5972    }
5973
5974    #[test]
5975    fn netdata_function_tail_delta_reports_exact_incremental_facets_and_histogram() {
5976        let dir = TempDir::new().expect("tempdir");
5977        let start_realtime_usec = 1_700_000_000_000_000;
5978        write_stepped_netdata_test_journal(
5979            dir.path(),
5980            "delta-contract.journal",
5981            5,
5982            start_realtime_usec,
5983            1,
5984        );
5985        let anchor = start_realtime_usec + 1;
5986
5987        let response = run_netdata_contract_request(
5988            dir.path(),
5989            json!({
5990                "after": 1_700_000_000,
5991                "before": 1_700_000_010,
5992                "last": 2,
5993                "direction": "backward",
5994                "data_only": true,
5995                "delta": true,
5996                "tail": true,
5997                "if_modified_since": anchor,
5998                "anchor": anchor,
5999                "facets": ["SERVICE"],
6000                "histogram": "SERVICE",
6001            }),
6002        );
6003
6004        assert_eq!(response["status"], 200);
6005        assert_eq!(
6006            response_column_strings(&response, "MESSAGE"),
6007            vec!["row-4", "row-3"]
6008        );
6009        assert_eq!(
6010            response_facet_count(&response, "facets_delta", "SERVICE", "even"),
6011            2
6012        );
6013        assert_eq!(
6014            response_facet_count(&response, "facets_delta", "SERVICE", "odd"),
6015            1
6016        );
6017        assert_eq!(
6018            response_histogram_total(&response, "histogram_delta", "even"),
6019            2
6020        );
6021        assert_eq!(
6022            response_histogram_total(&response, "histogram_delta", "odd"),
6023            1
6024        );
6025        let items = response["items_delta"].as_object().expect("items_delta");
6026        assert_eq!(items["matched"], 3);
6027        assert_eq!(items["returned"], 2);
6028    }
6029
6030    #[test]
6031    fn realtime_adjuster_preserves_forward_state_across_file_boundaries() {
6032        let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Forward);
6033
6034        assert_eq!(adjuster.adjust(10), 10);
6035        assert_eq!(adjuster.adjust(10), 11);
6036        assert_eq!(adjuster.adjust(10), 12);
6037    }
6038
6039    #[test]
6040    fn realtime_adjuster_preserves_backward_state_across_file_boundaries() {
6041        let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Backward);
6042
6043        assert_eq!(adjuster.adjust(10), 10);
6044        assert_eq!(adjuster.adjust(10), 9);
6045        assert_eq!(adjuster.adjust(10), 8);
6046    }
6047
6048    #[test]
6049    fn systemd_profile_keeps_user_group_ids_raw_by_default() {
6050        let context = DisplayContext::default();
6051        let profile = SystemdJournalProfile;
6052        assert_eq!(
6053            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6054            json!("0")
6055        );
6056        assert_eq!(
6057            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6058            json!("0")
6059        );
6060    }
6061
6062    #[cfg(unix)]
6063    #[test]
6064    fn plugin_compatible_profile_resolves_user_group_ids_explicitly() {
6065        let context = DisplayContext::default();
6066        let profile = SystemdJournalPluginProfile;
6067        assert_eq!(
6068            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6069            json!("root")
6070        );
6071        assert_eq!(
6072            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6073            json!("root")
6074        );
6075    }
6076
6077    #[cfg(unix)]
6078    #[test]
6079    fn plugin_compatible_profile_caches_user_group_resolution() {
6080        let context = DisplayContext::default();
6081        let profile = SystemdJournalPluginProfile;
6082
6083        assert_eq!(
6084            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6085            json!("root")
6086        );
6087        assert_eq!(
6088            profile.field_display_value(&context, DisplayScope::Data, "_UID", b"0"),
6089            json!("root")
6090        );
6091        assert_eq!(
6092            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6093            json!("root")
6094        );
6095        assert_eq!(
6096            profile.field_display_value(&context, DisplayScope::Data, "_GID", b"0"),
6097            json!("root")
6098        );
6099
6100        assert_eq!(context.uid_display_cache.borrow().len(), 1);
6101        assert_eq!(context.gid_display_cache.borrow().len(), 1);
6102    }
6103
6104    #[test]
6105    fn file_overlap_uses_netdata_max_realtime_slack() {
6106        let file_first_seconds = 200_000_000u64;
6107        let file_last_seconds = 200_000_100u64;
6108        let slack_seconds = NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC / 1_000_000;
6109        let header = crate::FileHeader {
6110            signature: *b"LPKSHHRH",
6111            compatible_flags: 0,
6112            incompatible_flags: 0,
6113            state: 0,
6114            header_size: 0,
6115            n_entries: 0,
6116            head_entry_realtime: file_first_seconds * 1_000_000,
6117            tail_entry_realtime: file_last_seconds * 1_000_000,
6118            head_entry_seqnum: 0,
6119            tail_entry_seqnum: 0,
6120            tail_entry_boot_id: [0; 16],
6121            seqnum_id: [0; 16],
6122        };
6123        let config = NetdataFunctionConfig::systemd_journal();
6124
6125        let inside_slack = NetdataRequest::parse(
6126            &json!({
6127                "after": file_last_seconds + slack_seconds - 1,
6128                "before": file_last_seconds + slack_seconds + 500
6129            }),
6130            &config,
6131        )
6132        .expect("parse request");
6133        assert!(file_may_overlap_request(header, &inside_slack));
6134
6135        let outside_slack = NetdataRequest::parse(
6136            &json!({
6137                "after": file_last_seconds + slack_seconds + 1,
6138                "before": file_last_seconds + slack_seconds + 500
6139            }),
6140            &config,
6141        )
6142        .expect("parse request");
6143        assert!(!file_may_overlap_request(header, &outside_slack));
6144    }
6145
6146    #[test]
6147    fn journal_file_order_matches_plugin_comparator_shape() {
6148        let older = JournalFileOrderInfo {
6149            msg_first_realtime_usec: 100,
6150            msg_last_realtime_usec: 200,
6151            file_last_modified_usec: 200,
6152            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6153        };
6154        let newer = JournalFileOrderInfo {
6155            msg_first_realtime_usec: 100,
6156            msg_last_realtime_usec: 300,
6157            file_last_modified_usec: 100,
6158            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6159        };
6160        assert_eq!(
6161            compare_journal_file_order(&newer, &older, Direction::Backward),
6162            Ordering::Less
6163        );
6164        assert_eq!(
6165            compare_journal_file_order(&newer, &older, Direction::Forward),
6166            Ordering::Greater
6167        );
6168
6169        let newer_mtime = JournalFileOrderInfo {
6170            msg_first_realtime_usec: 100,
6171            msg_last_realtime_usec: 200,
6172            file_last_modified_usec: 300,
6173            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6174        };
6175        assert_eq!(
6176            compare_journal_file_order(&newer_mtime, &older, Direction::Backward),
6177            Ordering::Less
6178        );
6179
6180        let newer_first = JournalFileOrderInfo {
6181            msg_first_realtime_usec: 150,
6182            msg_last_realtime_usec: 200,
6183            file_last_modified_usec: 200,
6184            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6185        };
6186        assert_eq!(
6187            compare_journal_file_order(&newer_first, &older, Direction::Backward),
6188            Ordering::Less
6189        );
6190    }
6191
6192    #[test]
6193    fn boot_first_realtime_keeps_earliest_timestamp_like_plugin() {
6194        let mut boot_first = BTreeMap::new();
6195        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 300);
6196        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 100);
6197        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 200);
6198
6199        assert_eq!(boot_first.get(b"boot-a".as_slice()), Some(&100));
6200    }
6201
6202    #[test]
6203    fn merge_histogram_rejects_inconsistent_bucket_shape() {
6204        let mut target = Some(ExplorerHistogram {
6205            field: b"PRIORITY".to_vec(),
6206            buckets: vec![ExplorerHistogramBucket {
6207                start_realtime_usec: 1_000_000,
6208                end_realtime_usec: 2_000_000,
6209                values: HashMap::new(),
6210            }],
6211        });
6212        let source = ExplorerHistogram {
6213            field: b"PRIORITY".to_vec(),
6214            buckets: vec![ExplorerHistogramBucket {
6215                start_realtime_usec: 1_000_000,
6216                end_realtime_usec: 1_500_000,
6217                values: HashMap::new(),
6218            }],
6219        };
6220
6221        let err = merge_histogram(&mut target, source).expect_err("shape mismatch");
6222        assert!(matches!(
6223            err,
6224            SdkError::Unsupported("inconsistent Netdata histogram bucket shape")
6225        ));
6226    }
6227
6228    #[test]
6229    fn collect_journal_files_recurses_nested_directories() {
6230        let dir = TempDir::new().expect("tempdir");
6231        let nested = dir.path().join("machine").join("nested");
6232        write_named_netdata_test_journal(&nested, "system.journal", 1, 1_700_000_000_000_000);
6233
6234        let collection = collect_journal_files(dir.path()).expect("collect files");
6235
6236        assert_eq!(collection.files.len(), 1);
6237        assert_eq!(collection.skipped, 0);
6238        assert!(collection.errors.is_empty());
6239        assert_eq!(
6240            collection.files[0]
6241                .file_name()
6242                .and_then(|name| name.to_str()),
6243            Some("system.journal")
6244        );
6245    }
6246
6247    #[cfg(unix)]
6248    #[test]
6249    fn collect_journal_files_deduplicates_symlinked_directories() {
6250        let dir = TempDir::new().expect("tempdir");
6251        let real = dir.path().join("real");
6252        let link = dir.path().join("link");
6253        write_named_netdata_test_journal(&real, "system.journal", 1, 1_700_000_000_000_000);
6254        std::os::unix::fs::symlink(&real, &link).expect("symlink");
6255
6256        let collection = collect_journal_files(dir.path()).expect("collect files");
6257
6258        assert_eq!(collection.files.len(), 1);
6259        assert_eq!(collection.skipped, 0);
6260        assert!(collection.errors.is_empty());
6261    }
6262
6263    #[cfg(unix)]
6264    #[test]
6265    fn collect_journal_files_deduplicates_symlinked_files() {
6266        let dir = TempDir::new().expect("tempdir");
6267        write_named_netdata_test_journal(dir.path(), "system.journal", 1, 1_700_000_000_000_000);
6268        std::os::unix::fs::symlink(
6269            dir.path().join("system.journal"),
6270            dir.path().join("system-copy.journal"),
6271        )
6272        .expect("symlink");
6273
6274        let collection = collect_journal_files(dir.path()).expect("collect files");
6275
6276        assert_eq!(collection.files.len(), 1);
6277        assert_eq!(collection.skipped, 0);
6278        assert!(collection.errors.is_empty());
6279    }
6280
6281    #[cfg(unix)]
6282    #[test]
6283    fn collect_journal_files_reports_unreadable_subdirectories() {
6284        use std::os::unix::fs::PermissionsExt;
6285
6286        if unsafe { libc::geteuid() } == 0 {
6287            return;
6288        }
6289
6290        let dir = TempDir::new().expect("tempdir");
6291        std::fs::write(dir.path().join("visible.journal"), b"").expect("journal");
6292        let locked = dir.path().join("locked");
6293        std::fs::create_dir(&locked).expect("locked dir");
6294        std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o000))
6295            .expect("lock dir");
6296
6297        let collection = collect_journal_files(dir.path()).expect("collect files");
6298
6299        std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o700))
6300            .expect("unlock dir");
6301        assert_eq!(collection.files.len(), 1);
6302        assert_eq!(collection.skipped, 1);
6303        assert_eq!(collection.errors.len(), 1);
6304        assert!(collection.errors[0].contains("locked"));
6305    }
6306
6307    #[test]
6308    fn source_summary_fills_missing_caller_metadata_from_header() {
6309        let dir = TempDir::new().expect("tempdir");
6310        write_named_netdata_test_journal(dir.path(), "system.journal", 2, 1_700_000_000_000_000);
6311        let path = dir.path().join("system.journal");
6312        let mut state = TestNetdataState::default();
6313        state.metadata.insert(
6314            path.clone(),
6315            NetdataJournalFileMetadata {
6316                msg_first_realtime_usec: Some(1_699_999_999_000_000),
6317                ..NetdataJournalFileMetadata::default()
6318            },
6319        );
6320        let options = NetdataFunctionRunOptions {
6321            state: Some(&mut state),
6322            ..NetdataFunctionRunOptions::default()
6323        };
6324
6325        let summary = JournalSourceSummary::from_paths(&[path], ReaderOptions::default(), &options);
6326
6327        assert_eq!(summary.first_realtime_usec, Some(1_699_999_999_000_000));
6328        assert_eq!(summary.last_realtime_usec, Some(1_700_000_000_000_001));
6329    }
6330
6331    #[test]
6332    fn source_selection_echoes_and_filters_known_groups() {
6333        let config = NetdataFunctionConfig::systemd_journal();
6334        let request = NetdataRequest::parse(
6335            &json!({
6336                "selections": {
6337                    "__logs_sources": ["all-local-system-logs"]
6338                }
6339            }),
6340            &config,
6341        )
6342        .expect("parse source-filtered request");
6343
6344        assert_eq!(request.source_type, SOURCE_TYPE_LOCAL_SYSTEM);
6345        assert_eq!(
6346            request.echo.get("source_type").and_then(Value::as_u64),
6347            Some(SOURCE_TYPE_LOCAL_SYSTEM)
6348        );
6349        assert!(
6350            request
6351                .echo
6352                .pointer("/selections/__logs_sources/0")
6353                .is_some_and(Value::is_null)
6354        );
6355        assert!(request.matches_source(Path::new("/var/log/journal/machine/system.journal"), None));
6356        assert!(!request.matches_source(
6357            Path::new("/var/log/journal/machine/user-1000.journal"),
6358            None
6359        ));
6360    }
6361
6362    #[test]
6363    fn source_selection_uses_caller_metadata_before_filename_fallback() {
6364        let dir = TempDir::new().expect("tempdir");
6365        write_named_netdata_test_journal(dir.path(), "user-1000.journal", 1, 1_700_000_000_000_000);
6366        let path = dir.path().join("user-1000.journal");
6367        let config = NetdataFunctionConfig::systemd_journal();
6368        let request = NetdataRequest::parse(
6369            &json!({
6370                "after": 1_700_000_000,
6371                "before": 1_700_000_001,
6372                "selections": {
6373                    "__logs_sources": ["all-local-system-logs"]
6374                }
6375            }),
6376            &config,
6377        )
6378        .expect("parse source-filtered request");
6379        assert!(!request.matches_source(&path, None));
6380
6381        let mut state = TestNetdataState::default();
6382        state.metadata.insert(
6383            path.clone(),
6384            NetdataJournalFileMetadata {
6385                source_type: Some(NETDATA_SOURCE_TYPE_LOCAL_SYSTEM),
6386                source_name: Some("system-registry".to_string()),
6387                ..NetdataJournalFileMetadata::default()
6388            },
6389        );
6390        let options = NetdataFunctionRunOptions {
6391            state: Some(&mut state),
6392            ..NetdataFunctionRunOptions::default()
6393        };
6394        let selected =
6395            select_journal_files_for_request(vec![path], &request, config.reader_options, &options);
6396
6397        assert_eq!(selected.files.len(), 1);
6398    }
6399
6400    #[test]
6401    fn netdata_function_state_receives_learned_source_realtime_delta() {
6402        let dir = TempDir::new().expect("tempdir");
6403        let commit_realtime_usec = 1_700_000_030_000_000;
6404        let source_realtime_usec = commit_realtime_usec - 30_000_000;
6405        write_source_realtime_delta_journal(
6406            dir.path(),
6407            "system.journal",
6408            commit_realtime_usec,
6409            source_realtime_usec,
6410        );
6411        let request = json!({
6412            "after": 1_700_000_029,
6413            "before": 1_700_000_031,
6414            "facets": ["SERVICE"],
6415            "histogram": "SERVICE",
6416            "last": 1,
6417            "sampling": 0
6418        });
6419        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
6420        let mut state = TestNetdataState::default();
6421        let options = NetdataFunctionRunOptions {
6422            state: Some(&mut state),
6423            ..NetdataFunctionRunOptions::from_timeout_seconds(0)
6424        };
6425
6426        let response = function
6427            .run_directory_request_json_with_options(dir.path(), &request, options)
6428            .expect("run function");
6429
6430        assert_eq!(response["status"], 200);
6431        assert_eq!(state.updates.len(), 1);
6432        assert_eq!(state.updates[0].1, 30_000_000);
6433    }
6434
6435    #[test]
6436    fn source_classification_matches_plugin_filename_shape() {
6437        assert_eq!(
6438            journal_file_source_type(Path::new("/var/log/journal/machine/system.journal")),
6439            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM
6440        );
6441        assert_eq!(
6442            journal_file_source_type(Path::new("/var/log/journal/machine/user-1000.journal")),
6443            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER
6444        );
6445        assert_eq!(
6446            journal_file_source_type(Path::new("/var/log/journal/machine/other.journal")),
6447            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
6448        );
6449        assert_eq!(
6450            journal_file_source_type(Path::new(
6451                "/var/log/journal/machine.namespace/system.journal"
6452            )),
6453            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE
6454        );
6455        assert_eq!(
6456            journal_file_source_type(Path::new(
6457                "/var/log/journal/remote/remote-host-a@machine.journal"
6458            )),
6459            SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL
6460        );
6461    }
6462
6463    #[test]
6464    fn exact_source_names_follow_plugin_prefixes() {
6465        assert_eq!(
6466            journal_file_exact_source_name(Path::new(
6467                "/var/log/journal/machine.namespace/system.journal"
6468            ))
6469            .as_deref(),
6470            Some("namespace-namespace")
6471        );
6472        assert_eq!(
6473            journal_file_exact_source_name(Path::new(
6474                "/var/log/journal/remote/remote-host-a@machine.journal"
6475            ))
6476            .as_deref(),
6477            Some("remote-host-a")
6478        );
6479        assert_eq!(
6480            journal_file_exact_source_name(Path::new(
6481                "/var/log/journal/remote/remote-host-b.journal~.zst"
6482            ))
6483            .as_deref(),
6484            Some("remote-host-b")
6485        );
6486    }
6487
6488    #[test]
6489    fn disposed_journal_extension_matches_plugin_scan_contract() {
6490        assert!(is_journal_file_name(Path::new("active.journal")));
6491        assert!(is_journal_file_name(Path::new("archived.journal~")));
6492        assert!(is_journal_file_name(Path::new("active.journal.zst")));
6493        assert!(is_journal_file_name(Path::new("archived.journal~.zst")));
6494    }
6495
6496    fn test_located_row(realtime_usec: u64) -> LocatedRow {
6497        LocatedRow {
6498            file_path: PathBuf::from("test.journal"),
6499            row: ExplorerRow {
6500                realtime_usec,
6501                cursor: String::new(),
6502                payloads: Vec::new(),
6503            },
6504        }
6505    }
6506
6507    fn write_source_realtime_delta_journal(
6508        directory: &std::path::Path,
6509        name: &str,
6510        commit_realtime_usec: u64,
6511        source_realtime_usec: u64,
6512    ) {
6513        std::fs::create_dir_all(directory).expect("create journal dir");
6514        let path = directory.join(name);
6515        let repo_file = RepoFile::from_path(&path).expect("repo file");
6516        let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
6517        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
6518        let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
6519        let source = format!("_SOURCE_REALTIME_TIMESTAMP={source_realtime_usec}");
6520        let payloads: [&[u8]; 4] = [
6521            b"MESSAGE=source lag test".as_slice(),
6522            b"SERVICE=delta".as_slice(),
6523            b"PRIORITY=6".as_slice(),
6524            source.as_bytes(),
6525        ];
6526        writer
6527            .add_entry(
6528                &mut file,
6529                &payloads,
6530                commit_realtime_usec,
6531                commit_realtime_usec,
6532            )
6533            .expect("write entry");
6534        file.sync().expect("sync journal");
6535    }
6536}