Skip to main content

journal/
netdata.rs

1use crate::explorer::{ExplorerSamplingState, histogram_bucket_count_for_query};
2use crate::{
3    Direction, ExplorerAnchor, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
4    ExplorerFtsPattern, ExplorerHistogram, ExplorerProgress, ExplorerQuery, ExplorerResult,
5    ExplorerRow, ExplorerSampling, ExplorerStats, ExplorerStopReason, ExplorerStrategy, FileHeader,
6    FileReader, ReaderOptions, Result, SdkError,
7};
8use chrono::{DateTime, Utc};
9use serde_json::{Map, Value, json};
10use std::cell::RefCell;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet, VecDeque};
13#[cfg(unix)]
14use std::ffi::CStr;
15use std::path::{Path, PathBuf};
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18const DEFAULT_FUNCTION_NAME: &str = "systemd-journal";
19const DEFAULT_ITEMS_TO_RETURN: usize = 200;
20const DEFAULT_TIME_WINDOW_SECONDS: i64 = 3600;
21const DEFAULT_ITEMS_SAMPLING: u64 = 1_000_000;
22const DATA_ONLY_CHECK_EVERY_ROWS: u64 = 128;
23const API_RELATIVE_TIME_MAX_SECONDS: i64 = 3 * 365 * 86_400;
24const NETDATA_MISSING_AFTER_RELATIVE_SECONDS: i64 = 600;
25const DEFAULT_HISTOGRAM_BUCKETS: usize = 150;
26const EFFECTIVELY_DISABLED_TIMEOUT_SECONDS: u64 = 100 * 365 * 24 * 60 * 60;
27const NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC: u64 = 5_000_000;
28const NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC: u64 = 2 * 60 * 1_000_000;
29const NETDATA_EMPTY_STRING_FACET_HASH_ID: &str = "CzGfAU2z3TC";
30const NETDATA_UNAVAILABLE_FIELD_LABEL: &str = "[unavailable field]";
31const NETDATA_FACET_MAX_VALUE_LENGTH: usize = 8192;
32const NETDATA_MAX_DIRECTORY_SCAN_DEPTH: usize = 64;
33const NETDATA_MAX_DIRECTORY_SCAN_COUNT: usize = 8192;
34const SOURCE_TYPE_ALL: u64 = 1 << 0;
35const SOURCE_TYPE_LOCAL_ALL: u64 = 1 << 1;
36const SOURCE_TYPE_REMOTE_ALL: u64 = 1 << 2;
37const SOURCE_TYPE_LOCAL_SYSTEM: u64 = 1 << 3;
38const SOURCE_TYPE_LOCAL_USER: u64 = 1 << 4;
39const SOURCE_TYPE_LOCAL_NAMESPACE: u64 = 1 << 5;
40const SOURCE_TYPE_LOCAL_OTHER: u64 = 1 << 6;
41
42pub const NETDATA_SOURCE_TYPE_ALL: u64 = SOURCE_TYPE_ALL;
43pub const NETDATA_SOURCE_TYPE_LOCAL_ALL: u64 = SOURCE_TYPE_LOCAL_ALL;
44pub const NETDATA_SOURCE_TYPE_REMOTE_ALL: u64 = SOURCE_TYPE_REMOTE_ALL;
45pub const NETDATA_SOURCE_TYPE_LOCAL_SYSTEM: u64 = SOURCE_TYPE_LOCAL_SYSTEM;
46pub const NETDATA_SOURCE_TYPE_LOCAL_USER: u64 = SOURCE_TYPE_LOCAL_USER;
47pub const NETDATA_SOURCE_TYPE_LOCAL_NAMESPACE: u64 = SOURCE_TYPE_LOCAL_NAMESPACE;
48pub const NETDATA_SOURCE_TYPE_LOCAL_OTHER: u64 = SOURCE_TYPE_LOCAL_OTHER;
49
50const NETDATA_ACCEPTED_PARAMS: &[&str] = &[
51    "info",
52    "__logs_sources",
53    "after",
54    "before",
55    "anchor",
56    "direction",
57    "last",
58    "query",
59    "facets",
60    "histogram",
61    "if_modified_since",
62    "data_only",
63    "delta",
64    "tail",
65    "sampling",
66    "slice",
67];
68
69const SYSTEMD_DEFAULT_VIEW_KEYS: &[&str] = &[
70    "_HOSTNAME",
71    "ND_JOURNAL_PROCESS",
72    "MESSAGE",
73    "PRIORITY",
74    "SYSLOG_FACILITY",
75    "ERRNO",
76    "ND_JOURNAL_FILE",
77    "SYSLOG_IDENTIFIER",
78    "UNIT",
79    "USER_UNIT",
80    "MESSAGE_ID",
81    "_BOOT_ID",
82    "_SYSTEMD_OWNER_UID",
83    "_UID",
84    "OBJECT_SYSTEMD_OWNER_UID",
85    "OBJECT_UID",
86    "_GID",
87    "OBJECT_GID",
88    "_CAP_EFFECTIVE",
89    "_AUDIT_LOGINUID",
90    "OBJECT_AUDIT_LOGINUID",
91    "_SOURCE_REALTIME_TIMESTAMP",
92];
93
94const SYSTEMD_DEFAULT_FACETS: &[&str] = &[
95    "_HOSTNAME",
96    "PRIORITY",
97    "SYSLOG_FACILITY",
98    "ERRNO",
99    "SYSLOG_IDENTIFIER",
100    "UNIT",
101    "USER_UNIT",
102    "MESSAGE_ID",
103    "_BOOT_ID",
104    "_SYSTEMD_OWNER_UID",
105    "_UID",
106    "OBJECT_SYSTEMD_OWNER_UID",
107    "OBJECT_UID",
108    "_GID",
109    "OBJECT_GID",
110    "_AUDIT_LOGINUID",
111    "OBJECT_AUDIT_LOGINUID",
112    "CODE_FILE",
113    "_SYSTEMD_UNIT",
114    "_SYSTEMD_USER_SLICE",
115    "CODE_FUNC",
116    "_TRANSPORT",
117    "_COMM",
118    "_RUNTIME_SCOPE",
119    "_MACHINE_ID",
120    "_SYSTEMD_SLICE",
121    "UNIT_RESULT",
122    "_SYSTEMD_CGROUP",
123    "_EXE",
124    "_SYSTEMD_USER_UNIT",
125    "_SYSTEMD_SESSION",
126    "COREDUMP_CGROUP",
127    "COREDUMP_USER_UNIT",
128    "COREDUMP_UNIT",
129    "COREDUMP_SIGNAL_NAME",
130    "COREDUMP_COMM",
131    "_UDEV_DEVNODE",
132    "_KERNEL_SUBSYSTEM",
133    "OBJECT_EXE",
134    "OBJECT_SYSTEMD_CGROUP",
135    "OBJECT_COMM",
136    "OBJECT_SYSTEMD_UNIT",
137    "OBJECT_SYSTEMD_USER_UNIT",
138    "_SELINUX_CONTEXT",
139    "_NAMESPACE",
140    "OBJECT_SYSTEMD_SESSION",
141    "CONTAINER_ID",
142    "CONTAINER_NAME",
143    "CONTAINER_TAG",
144    "IMAGE_NAME",
145    "ND_NIDL_NODE",
146    "ND_NIDL_CONTEXT",
147    "ND_LOG_SOURCE",
148    "ND_ALERT_NAME",
149    "ND_ALERT_CLASS",
150    "ND_ALERT_COMPONENT",
151    "ND_ALERT_TYPE",
152    "ND_ALERT_STATUS",
153];
154
155#[derive(Debug, Clone)]
156pub struct NetdataFunctionConfig {
157    pub function_name: String,
158    pub default_facets: Vec<String>,
159    pub default_view_keys: Vec<String>,
160    pub default_histogram: Option<String>,
161    pub reader_options: ReaderOptions,
162    pub explorer_strategy: ExplorerStrategy,
163}
164
165impl NetdataFunctionConfig {
166    pub fn systemd_journal() -> Self {
167        Self {
168            function_name: DEFAULT_FUNCTION_NAME.to_string(),
169            default_facets: SYSTEMD_DEFAULT_FACETS
170                .iter()
171                .map(|field| (*field).to_string())
172                .collect(),
173            default_view_keys: SYSTEMD_DEFAULT_VIEW_KEYS
174                .iter()
175                .map(|field| (*field).to_string())
176                .collect(),
177            default_histogram: Some("PRIORITY".to_string()),
178            reader_options: ReaderOptions::snapshot(),
179            explorer_strategy: ExplorerStrategy::Traversal,
180        }
181    }
182}
183
184impl Default for NetdataFunctionConfig {
185    fn default() -> Self {
186        Self::systemd_journal()
187    }
188}
189
190#[derive(Debug, Default)]
191pub struct DisplayContext {
192    boot_first_realtime: BTreeMap<Vec<u8>, u64>,
193    uid_display_cache: RefCell<BTreeMap<String, String>>,
194    gid_display_cache: RefCell<BTreeMap<String, String>>,
195}
196
197#[derive(Debug, Clone, Copy)]
198pub enum DisplayScope {
199    Data,
200    Facet,
201    Histogram,
202}
203
204pub trait NetdataFunctionProfile {
205    fn field_display_value(
206        &self,
207        _context: &DisplayContext,
208        _scope: DisplayScope,
209        _field: &str,
210        value: &[u8],
211    ) -> Value {
212        Value::String(String::from_utf8_lossy(value).into_owned())
213    }
214
215    fn facet_option_name(&self, context: &DisplayContext, field: &str, raw_value: &[u8]) -> String {
216        match self.field_display_value(context, DisplayScope::Facet, field, raw_value) {
217            Value::String(value) => value,
218            other => other.to_string(),
219        }
220    }
221
222    fn row_options(&self, fields: &BTreeMap<String, Vec<Vec<u8>>>) -> Value {
223        if let Some(priority) = first_value(fields, "PRIORITY") {
224            return json!({ "severity": priority_to_row_severity(priority) });
225        }
226        json!({ "severity": "normal" })
227    }
228}
229
230#[derive(Debug, Clone, Copy, Default)]
231pub struct SystemdJournalProfile;
232
233#[derive(Debug, Clone, Copy, Default)]
234pub struct SystemdJournalPluginProfile;
235
236impl NetdataFunctionProfile for SystemdJournalProfile {
237    fn field_display_value(
238        &self,
239        context: &DisplayContext,
240        scope: DisplayScope,
241        field: &str,
242        value: &[u8],
243    ) -> Value {
244        systemd_field_display_value(context, scope, field, value, false)
245    }
246}
247
248impl NetdataFunctionProfile for SystemdJournalPluginProfile {
249    fn field_display_value(
250        &self,
251        context: &DisplayContext,
252        scope: DisplayScope,
253        field: &str,
254        value: &[u8],
255    ) -> Value {
256        systemd_field_display_value(context, scope, field, value, true)
257    }
258}
259
260#[derive(Debug, Clone)]
261pub struct NetdataJournalFunction<P = SystemdJournalProfile> {
262    config: NetdataFunctionConfig,
263    profile: P,
264}
265
266#[derive(Debug, Clone)]
267pub struct NetdataFunctionProgress {
268    pub current_file: usize,
269    pub total_files: usize,
270    pub matched_files: u64,
271    pub skipped_files: u64,
272    pub stats: ExplorerStats,
273    pub elapsed: Duration,
274}
275
276#[derive(Debug, Clone, Default, PartialEq, Eq)]
277pub struct NetdataJournalFileMetadata {
278    pub source_type: Option<u64>,
279    pub source_name: Option<String>,
280    pub file_last_modified_usec: Option<u64>,
281    pub msg_first_realtime_usec: Option<u64>,
282    pub msg_last_realtime_usec: Option<u64>,
283    pub journal_vs_realtime_delta_usec: Option<u64>,
284}
285
286pub trait NetdataFunctionState {
287    fn file_metadata(&self, _path: &Path) -> Option<NetdataJournalFileMetadata> {
288        None
289    }
290
291    fn update_file_journal_vs_realtime_delta_usec(&mut self, _path: &Path, _delta_usec: u64) {}
292}
293
294pub struct NetdataFunctionRunOptions<'a> {
295    pub timeout: Option<Duration>,
296    pub progress_callback: Option<&'a mut dyn FnMut(NetdataFunctionProgress)>,
297    pub cancellation_callback: Option<&'a dyn Fn() -> bool>,
298    pub state: Option<&'a mut dyn NetdataFunctionState>,
299    pub progress_interval: Duration,
300}
301
302impl NetdataFunctionRunOptions<'_> {
303    pub fn from_timeout_seconds(seconds: u64) -> Self {
304        let seconds = if seconds == 0 {
305            EFFECTIVELY_DISABLED_TIMEOUT_SECONDS
306        } else {
307            seconds
308        };
309        Self {
310            timeout: Some(Duration::from_secs(seconds)),
311            progress_callback: None,
312            cancellation_callback: None,
313            state: None,
314            progress_interval: Duration::from_millis(250),
315        }
316    }
317}
318
319impl Default for NetdataFunctionRunOptions<'_> {
320    fn default() -> Self {
321        Self {
322            timeout: Some(Duration::from_secs(EFFECTIVELY_DISABLED_TIMEOUT_SECONDS)),
323            progress_callback: None,
324            cancellation_callback: None,
325            state: None,
326            progress_interval: Duration::from_millis(250),
327        }
328    }
329}
330
331impl NetdataJournalFunction<SystemdJournalProfile> {
332    pub fn systemd_journal() -> Self {
333        Self {
334            config: NetdataFunctionConfig::systemd_journal(),
335            profile: SystemdJournalProfile,
336        }
337    }
338}
339
340impl NetdataJournalFunction<SystemdJournalPluginProfile> {
341    pub fn systemd_journal_plugin_compatible() -> Self {
342        Self {
343            config: NetdataFunctionConfig::systemd_journal(),
344            profile: SystemdJournalPluginProfile,
345        }
346    }
347}
348
349impl<P> NetdataJournalFunction<P>
350where
351    P: NetdataFunctionProfile,
352{
353    pub fn new(config: NetdataFunctionConfig, profile: P) -> Self {
354        Self { config, profile }
355    }
356
357    pub fn run_directory_request_json(&self, directory: &Path, request: &Value) -> Result<Value> {
358        self.run_directory_request_json_with_options(
359            directory,
360            request,
361            NetdataFunctionRunOptions::default(),
362        )
363    }
364
365    pub fn run_directory_request_json_with_options(
366        &self,
367        directory: &Path,
368        request: &Value,
369        mut options: NetdataFunctionRunOptions<'_>,
370    ) -> Result<Value> {
371        let request = NetdataRequest::parse(request, &self.config)?;
372        let collection = collect_journal_files(directory)?;
373        let paths = collection.files;
374        if request.info {
375            return Ok(self.info_response(request.echo, &paths, &options));
376        }
377        let annotation_paths = paths.clone();
378
379        let selected =
380            select_journal_files_for_request(paths, &request, self.config.reader_options, &options);
381        if let Some(response) = not_modified_before_scan_response(&request, &selected) {
382            return Ok(response);
383        }
384        let selected_files = selected.files;
385        let deadline = options.timeout.map(|timeout| Instant::now() + timeout);
386        let mut combined = self.explore_files(&selected_files, &request, deadline, &mut options)?;
387        self.finalize_combined_result(
388            &request,
389            &selected_files,
390            deadline,
391            &mut options,
392            &mut combined,
393            collection.skipped,
394            collection.errors,
395        )?;
396        Ok(self.query_response(request, &annotation_paths, combined))
397    }
398
399    fn finalize_combined_result(
400        &self,
401        request: &NetdataRequest,
402        selected_files: &[SelectedJournalFile],
403        deadline: Option<Instant>,
404        options: &mut NetdataFunctionRunOptions<'_>,
405        combined: &mut CombinedResult,
406        skipped_files: u64,
407        file_errors: Vec<String>,
408    ) -> Result<()> {
409        combined.skipped_files = combined.skipped_files.saturating_add(skipped_files);
410        combined.file_errors.extend(file_errors);
411        if combined.cancelled {
412            return Ok(());
413        }
414        if !request.data_only {
415            combined.add_zero_count_facet_values_from_files(
416                &request.facets,
417                self.config.reader_options,
418            );
419            combined.add_zero_count_selected_filter_values(request);
420        }
421        if should_collect_unfiltered_facet_vocabulary(request, combined) {
422            let vocabulary = self.explore_files(
423                selected_files,
424                &request.unfiltered_vocabulary(),
425                deadline,
426                options,
427            )?;
428            combined.add_zero_count_facet_values(&vocabulary.facets);
429        }
430        Ok(())
431    }
432
433    pub fn run_directory_request_bytes(&self, directory: &Path, request: &[u8]) -> Result<Value> {
434        self.run_directory_request_bytes_with_options(
435            directory,
436            request,
437            NetdataFunctionRunOptions::default(),
438        )
439    }
440
441    pub fn run_directory_request_bytes_with_options(
442        &self,
443        directory: &Path,
444        request: &[u8],
445        options: NetdataFunctionRunOptions<'_>,
446    ) -> Result<Value> {
447        let request: Value = serde_json::from_slice(request).map_err(|err| {
448            SdkError::InvalidPath(format!("invalid Netdata function JSON: {err}"))
449        })?;
450        self.run_directory_request_json_with_options(directory, &request, options)
451    }
452
453    fn explore_files(
454        &self,
455        files: &[SelectedJournalFile],
456        request: &NetdataRequest,
457        deadline: Option<Instant>,
458        options: &mut NetdataFunctionRunOptions<'_>,
459    ) -> Result<CombinedResult> {
460        let query = request.to_explorer_query(
461            files.len() as u64,
462            None,
463            NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
464        );
465        let mut combined = CombinedResult::default();
466        let page_window = RefCell::new(NetdataPageWindow::for_request(request));
467        combined.sampling_enabled = query.sampling.is_some();
468        let mut sampling_state =
469            ExplorerSamplingState::for_query(&query, histogram_bucket_count_for_query(&query));
470        let realtime_adjuster = RefCell::new(NetdataRealtimeAdjuster::new(request.direction));
471        let started = Instant::now();
472        let total_files = files.len();
473        for (file_index, file) in files.iter().enumerate() {
474            let path = &file.path;
475            if should_stop_before_file(&mut combined, deadline, options) {
476                break;
477            }
478            let Some(mut reader) = self.open_file_for_explore(
479                path,
480                &mut combined,
481                options,
482                progress_context(file_index, total_files, started),
483            )?
484            else {
485                continue;
486            };
487            combined.matched_files = combined.matched_files.saturating_add(1);
488            combined.matched_paths.push(path.clone());
489            let query = request.file_query(files.len(), reader.header(), &file.order);
490            collect_column_fields_for_file(&mut reader, request, path, &mut combined);
491            let explored = self.explore_single_file(
492                &mut reader,
493                request,
494                &query,
495                deadline,
496                options,
497                &combined,
498                &page_window,
499                sampling_state.as_mut(),
500                &realtime_adjuster,
501                progress_context(file_index, total_files, started),
502                file.order.journal_vs_realtime_delta_usec,
503            );
504            let Some((result, stop_reason)) = record_explore_result(explored, path, &mut combined)
505            else {
506                continue;
507            };
508            if finish_explored_file(
509                options,
510                request,
511                file,
512                &query,
513                result,
514                stop_reason,
515                &mut combined,
516                files,
517                file_index,
518                progress_context(file_index, total_files, started),
519            )? {
520                break;
521            }
522        }
523        combined.expand_row_payloads(self.config.reader_options);
524        combined.page_counters = Some(page_window.into_inner().counters());
525        Ok(combined)
526    }
527
528    fn open_file_for_explore(
529        &self,
530        path: &Path,
531        combined: &mut CombinedResult,
532        options: &mut NetdataFunctionRunOptions<'_>,
533        progress: ProgressContext,
534    ) -> Result<Option<FileReader>> {
535        match FileReader::open_with_options(path, self.config.reader_options) {
536            Ok(reader) => Ok(Some(reader)),
537            Err(err) => {
538                combined.skipped_files = combined.skipped_files.saturating_add(1);
539                combined
540                    .file_errors
541                    .push(format!("{}: {err}", path.display()));
542                emit_progress_for_combined(options, combined, progress);
543                Ok(None)
544            }
545        }
546    }
547
548    #[allow(clippy::too_many_arguments)]
549    fn explore_single_file(
550        &self,
551        reader: &mut FileReader,
552        request: &NetdataRequest,
553        query: &ExplorerQuery,
554        deadline: Option<Instant>,
555        options: &mut NetdataFunctionRunOptions<'_>,
556        combined: &CombinedResult,
557        page_window: &RefCell<NetdataPageWindow>,
558        sampling_state: Option<&mut ExplorerSamplingState>,
559        realtime_adjuster: &RefCell<NetdataRealtimeAdjuster>,
560        progress: ProgressContext,
561        realtime_delta_usec: u64,
562    ) -> Result<(ExplorerResult, Option<ExplorerStopReason>)> {
563        let cancellation_callback = options.cancellation_callback;
564        let progress_interval = options.progress_interval;
565        let mut explorer_progress = |explorer_progress: ExplorerProgress| {
566            emit_explorer_progress(options, combined, explorer_progress, progress);
567        };
568        let mut control = ExplorerControl::new();
569        control.set_deadline(deadline);
570        control.set_cancellation_callback(cancellation_callback);
571        control.set_progress_interval(progress_interval);
572        control.set_progress_callback(Some(&mut explorer_progress));
573        control.set_sampling_state(sampling_state);
574        let mut candidate_row =
575            |realtime_usec| page_window.borrow().candidate_to_keep(realtime_usec);
576        control.set_candidate_row_callback(Some(&mut candidate_row));
577        let mut adjust_realtime =
578            |realtime_usec| realtime_adjuster.borrow_mut().adjust(realtime_usec);
579        control.set_realtime_adjust_callback(Some(&mut adjust_realtime));
580        let mut matched_row = |realtime_usec, rows_matched| {
581            delta_scan_can_stop(
582                request,
583                page_window,
584                realtime_usec,
585                rows_matched,
586                realtime_delta_usec,
587            )
588        };
589        control.set_matched_row_callback(Some(&mut matched_row));
590        let result = reader.explore_with_strategy_cursor_rows_controlled(
591            query,
592            self.config.explorer_strategy,
593            &mut control,
594        )?;
595        Ok((result, control.stop_reason()))
596    }
597
598    fn info_response(
599        &self,
600        echo: Value,
601        paths: &[PathBuf],
602        options: &NetdataFunctionRunOptions<'_>,
603    ) -> Value {
604        json!({
605            "_request": echo,
606            "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
607            "v": 3,
608            "accepted_params": self.accepted_params_from_fields(&[]),
609            "required_params": self.required_source_params(paths, options),
610            "show_ids": true,
611            "has_history": true,
612            "pagination": {
613                "enabled": true,
614                "key": "anchor",
615                "column": "timestamp",
616                "units": "timestamp_usec",
617            },
618            "status": 200,
619            "type": "table",
620            "help": "Netdata-compatible journal log function backed by the systemd journal SDK"
621        })
622    }
623
624    fn query_response(
625        &self,
626        request: NetdataRequest,
627        annotation_paths: &[PathBuf],
628        combined: CombinedResult,
629    ) -> Value {
630        // Match Netdata systemd-journal.plugin: if files are newer but no
631        // useful rows survive the query, the function returns 304.
632        let not_modified = request.if_modified_since_usec != 0
633            && !combined.partial
634            && combined.stats.rows_matched == 0;
635        if combined.cancelled {
636            return netdata_function_error(499, "Request cancelled.");
637        }
638        if not_modified {
639            return netdata_function_error(304, "No new data since the previous call.");
640        }
641        let artifacts = self.query_response_artifacts(&request, annotation_paths, &combined);
642        let mut response = base_query_response(&request, &combined, &artifacts);
643        let Some(object) = response.as_object_mut() else {
644            return netdata_function_error(500, "Internal Netdata function response error.");
645        };
646        self.add_query_response_metadata(object, &request, &combined, artifacts);
647        response
648    }
649
650    fn query_response_artifacts(
651        &self,
652        request: &NetdataRequest,
653        annotation_paths: &[PathBuf],
654        combined: &CombinedResult,
655    ) -> QueryResponseArtifacts {
656        let reportable_facet_fields = combined.reportable_facet_fields_bytes(&request.facets);
657        let reportable_facet_field_names = string_fields(&reportable_facet_fields);
658        let columns = self.build_columns(
659            &request,
660            &reportable_facet_fields,
661            &combined.rows,
662            &combined.facets,
663            &combined.column_fields,
664        );
665        let boot_ids = response_boot_ids(
666            &columns.order,
667            &combined.rows,
668            &combined.facets,
669            combined.histogram.as_ref(),
670        );
671        let context = DisplayContext {
672            boot_first_realtime: collect_boot_first_realtime(
673                annotation_paths,
674                self.config.reader_options,
675                &boot_ids,
676            ),
677            ..DisplayContext::default()
678        };
679        let data =
680            self.build_data_rows(&context, &columns.order, &combined.rows, request.direction);
681        let facets = self.build_facets(&context, &reportable_facet_fields, &combined.facets);
682        let histogram = combined.histogram.as_ref().map(|histogram| {
683            self.build_histogram(&context, histogram, combined.facets.get(&histogram.field))
684        });
685        let message = query_message(combined.timed_out, &combined.stats);
686        let items = response_items(request, combined, data.len() as u64);
687        QueryResponseArtifacts {
688            reportable_facet_field_names,
689            columns,
690            data,
691            facets,
692            histogram,
693            message,
694            items,
695        }
696    }
697
698    fn add_query_response_metadata(
699        &self,
700        object: &mut Map<String, Value>,
701        request: &NetdataRequest,
702        combined: &CombinedResult,
703        artifacts: QueryResponseArtifacts,
704    ) {
705        if !request.data_only {
706            self.add_full_query_response_metadata(object, request, combined, &artifacts);
707        } else if request.histogram.is_some() {
708            object.insert(
709                "available_histograms".to_string(),
710                self.available_histograms(request, combined),
711            );
712        }
713        add_last_modified_if_needed(object, request, combined);
714        add_sampling_if_needed(object, combined);
715        add_analysis_outputs_if_needed(object, request, artifacts);
716    }
717
718    fn add_full_query_response_metadata(
719        &self,
720        object: &mut Map<String, Value>,
721        request: &NetdataRequest,
722        combined: &CombinedResult,
723        artifacts: &QueryResponseArtifacts,
724    ) {
725        object.insert("message".to_string(), artifacts.message.clone());
726        object.insert("update_every".to_string(), Value::from(1));
727        object.insert("help".to_string(), Value::Null);
728        object.insert(
729            "accepted_params".to_string(),
730            self.accepted_params_from_fields(&artifacts.reportable_facet_field_names),
731        );
732        object.insert("default_sort_column".to_string(), Value::from("timestamp"));
733        object.insert("default_charts".to_string(), Value::Array(Vec::new()));
734        object.insert(
735            "available_histograms".to_string(),
736            self.available_histograms(request, combined),
737        );
738    }
739
740    fn build_columns(
741        &self,
742        request: &NetdataRequest,
743        reportable_facet_fields: &[Vec<u8>],
744        rows: &[LocatedRow],
745        facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
746        column_fields: &BTreeSet<String>,
747    ) -> Columns {
748        let mut order = vec!["timestamp".to_string(), "rowOptions".to_string()];
749        push_unique_many(&mut order, &self.config.default_view_keys);
750        push_unique_many(&mut order, &string_fields(reportable_facet_fields));
751        if let Some(histogram) = &request.histogram {
752            push_unique(&mut order, histogram);
753        }
754        for field in column_fields {
755            push_unique(&mut order, field);
756        }
757
758        for (field, values) in facets {
759            if !facet_group_is_reportable(values) {
760                continue;
761            }
762            push_unique(&mut order, &String::from_utf8_lossy(field));
763        }
764        for row in rows {
765            let fields = row_fields(row);
766            for field in fields.keys() {
767                push_unique(&mut order, field);
768            }
769        }
770
771        let mut map = Map::new();
772        for (index, key) in order.iter().enumerate() {
773            map.insert(key.clone(), column_metadata(key, index));
774        }
775        Columns { order, map }
776    }
777
778    fn build_data_rows(
779        &self,
780        context: &DisplayContext,
781        column_order: &[String],
782        rows: &[LocatedRow],
783        direction: Direction,
784    ) -> Vec<Value> {
785        let row_iter: Box<dyn Iterator<Item = &LocatedRow> + '_> = match direction {
786            Direction::Forward => Box::new(rows.iter().rev()),
787            Direction::Backward => Box::new(rows.iter()),
788        };
789        row_iter
790            .map(|located| {
791                let fields = row_fields(located);
792                let mut row = Vec::with_capacity(column_order.len());
793                for column in column_order {
794                    let value = match column.as_str() {
795                        "timestamp" => Value::from(located.row.realtime_usec),
796                        "rowOptions" => self.profile.row_options(&fields),
797                        field => first_value(&fields, field)
798                            .map(|value| {
799                                self.profile.field_display_value(
800                                    context,
801                                    DisplayScope::Data,
802                                    field,
803                                    value,
804                                )
805                            })
806                            .unwrap_or(Value::Null),
807                    };
808                    row.push(value);
809                }
810                Value::Array(row)
811            })
812            .collect()
813    }
814
815    fn build_facets(
816        &self,
817        context: &DisplayContext,
818        requested: &[Vec<u8>],
819        facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
820    ) -> Value {
821        let mut out = Vec::new();
822        for (order, field) in requested.iter().enumerate() {
823            let values = facets.get(field);
824            let field_name = String::from_utf8_lossy(field).into_owned();
825            let mut options: Vec<_> = values
826                .into_iter()
827                .flat_map(|values| values.iter())
828                .filter(|(value, count)| {
829                    (!value.is_empty() && value.as_slice() != b"-")
830                        || (**count == 0 && value.is_empty())
831                })
832                .map(|(value, count)| {
833                    if *count == 0 && value.is_empty() {
834                        return json!({
835                            "id": NETDATA_EMPTY_STRING_FACET_HASH_ID,
836                            "name": NETDATA_UNAVAILABLE_FIELD_LABEL,
837                            "count": count,
838                        });
839                    }
840                    json!({
841                        "id": String::from_utf8_lossy(value).into_owned(),
842                        "name": self.profile.facet_option_name(context, &field_name, value),
843                        "count": count,
844                    })
845                })
846                .collect();
847            sort_facet_options(&field_name, &mut options);
848            for (idx, option) in options.iter_mut().enumerate() {
849                if let Some(object) = option.as_object_mut() {
850                    object.insert("order".to_string(), Value::from((idx + 1) as u64));
851                }
852            }
853            out.push(json!({
854                "id": field_name,
855                "name": String::from_utf8_lossy(field).into_owned(),
856                "order": order + 1,
857                "options": options,
858            }));
859        }
860        Value::Array(out)
861    }
862
863    fn build_histogram(
864        &self,
865        context: &DisplayContext,
866        histogram: &ExplorerHistogram,
867        known_values: Option<&BTreeMap<Vec<u8>, u64>>,
868    ) -> Value {
869        let field = String::from_utf8_lossy(&histogram.field).into_owned();
870        let mut dimension_ids = BTreeSet::new();
871        let mut buckets = Vec::with_capacity(histogram.buckets.len());
872        for bucket in &histogram.buckets {
873            let mut values = BTreeMap::new();
874            for (value, count) in &bucket.values {
875                add_netdata_facet_count(&mut values, value, *count);
876            }
877            for value in values.keys() {
878                dimension_ids.insert(value.clone());
879            }
880            buckets.push((bucket.start_realtime_usec, values));
881        }
882        let actual_dimension_ids = dimension_ids.clone();
883        if let Some(known_values) = known_values {
884            for value in known_values.keys() {
885                if value.is_empty() || value.as_slice() == b"-" {
886                    continue;
887                }
888                dimension_ids.insert(value.clone());
889            }
890        }
891        let dimension_ids: Vec<Vec<u8>> = dimension_ids.into_iter().collect();
892        let labels: Vec<Value> = std::iter::once(Value::String("time".to_string()))
893            .chain(dimension_ids.iter().map(|value| {
894                match self.profile.field_display_value(
895                    context,
896                    DisplayScope::Histogram,
897                    &field,
898                    value,
899                ) {
900                    Value::String(value) => Value::String(value),
901                    other => Value::String(other.to_string()),
902                }
903            }))
904            .collect();
905        let data: Vec<Value> = buckets
906            .iter()
907            .map(|(start_realtime_usec, values)| {
908                let mut point = Vec::with_capacity(dimension_ids.len() + 1);
909                point.push(Value::from(start_realtime_usec / 1000));
910                for value in &dimension_ids {
911                    let count = values
912                        .get(value)
913                        .copied()
914                        .map(Value::from)
915                        .unwrap_or_else(|| {
916                            if actual_dimension_ids.contains(value) {
917                                Value::from(0)
918                            } else {
919                                Value::Null
920                            }
921                        });
922                    point.push(Value::Array(vec![count, Value::from(0), Value::from(0)]));
923                }
924                Value::Array(point)
925            })
926            .collect();
927
928        json!({
929            "id": field,
930            "name": field,
931            "chart": {
932                "result": {
933                    "labels": labels,
934                    "point": { "value": 0, "arp": 1, "pa": 2 },
935                    "data": data,
936                },
937                "view": {
938                    "title": format!("Events Distribution by {}", String::from_utf8_lossy(&histogram.field)),
939                    "update_every": histogram_update_every_seconds(histogram),
940                    "units": "events",
941                    "chart_type": "stackedBar",
942                }
943            }
944        })
945    }
946
947    fn accepted_params_from_fields(&self, fields: &[String]) -> Value {
948        NETDATA_ACCEPTED_PARAMS
949            .iter()
950            .copied()
951            .chain(fields.iter().map(String::as_str))
952            .map(|field| Value::String(field.to_string()))
953            .collect()
954    }
955
956    fn required_source_params(
957        &self,
958        paths: &[PathBuf],
959        options: &NetdataFunctionRunOptions<'_>,
960    ) -> Value {
961        let mut all = JournalSourceSummary::default();
962        let mut local = JournalSourceSummary::default();
963        let mut local_namespaces = JournalSourceSummary::default();
964        let mut local_system = JournalSourceSummary::default();
965        let mut local_user = JournalSourceSummary::default();
966        let mut remote = JournalSourceSummary::default();
967        let mut other = JournalSourceSummary::default();
968        let mut exact = BTreeMap::<String, JournalSourceSummary>::new();
969
970        for path in paths {
971            let metadata = file_metadata(options, path);
972            let source_type = metadata
973                .as_ref()
974                .and_then(|metadata| metadata.source_type)
975                .unwrap_or_else(|| journal_file_source_type(path));
976            all.add_path(path, self.config.reader_options, metadata.as_ref());
977            if source_type & SOURCE_TYPE_LOCAL_ALL != 0 {
978                local.add_path(path, self.config.reader_options, metadata.as_ref());
979            }
980            if source_type & SOURCE_TYPE_LOCAL_NAMESPACE != 0 {
981                local_namespaces.add_path(path, self.config.reader_options, metadata.as_ref());
982            }
983            if source_type & SOURCE_TYPE_LOCAL_SYSTEM != 0 {
984                local_system.add_path(path, self.config.reader_options, metadata.as_ref());
985            }
986            if source_type & SOURCE_TYPE_LOCAL_USER != 0 {
987                local_user.add_path(path, self.config.reader_options, metadata.as_ref());
988            }
989            if source_type & SOURCE_TYPE_REMOTE_ALL != 0 {
990                remote.add_path(path, self.config.reader_options, metadata.as_ref());
991            }
992            if source_type & SOURCE_TYPE_LOCAL_OTHER != 0 {
993                other.add_path(path, self.config.reader_options, metadata.as_ref());
994            }
995            let source_name = metadata
996                .as_ref()
997                .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
998                .or_else(|| journal_file_exact_source_name(path));
999            if let Some(source_name) = source_name {
1000                exact.entry(source_name).or_default().add_path(
1001                    path,
1002                    self.config.reader_options,
1003                    metadata.as_ref(),
1004                );
1005            }
1006        }
1007
1008        let mut source_options = Vec::new();
1009        push_source_option(&mut source_options, "all", &all);
1010        push_source_option(&mut source_options, "all-local-logs", &local);
1011        push_source_option(
1012            &mut source_options,
1013            "all-local-namespaces",
1014            &local_namespaces,
1015        );
1016        push_source_option(&mut source_options, "all-local-system-logs", &local_system);
1017        push_source_option(&mut source_options, "all-local-user-logs", &local_user);
1018        push_source_option(&mut source_options, "all-remote-systems", &remote);
1019        push_source_option(&mut source_options, "all-uncategorized", &other);
1020        for (name, summary) in exact {
1021            push_source_option(&mut source_options, &name, &summary);
1022        }
1023
1024        json!([{
1025            "id": "__logs_sources",
1026            "name": "Journal Sources",
1027            "help": "Select the logs source to query",
1028            "type": "multiselect",
1029            "options": source_options,
1030        }])
1031    }
1032
1033    fn available_histograms(&self, request: &NetdataRequest, combined: &CombinedResult) -> Value {
1034        let mut fields = combined.reportable_facet_fields(&request.facets);
1035        if request.data_only {
1036            if let Some(histogram) = &request.histogram {
1037                push_unique(&mut fields, histogram);
1038            }
1039        }
1040        let mut sorted = fields.clone();
1041        sorted.sort_by(|left, right| netdata_reorder_key(left).cmp(&netdata_reorder_key(right)));
1042        let order_by_field: BTreeMap<String, usize> = sorted
1043            .into_iter()
1044            .enumerate()
1045            .map(|(index, field)| (field, index + 1))
1046            .collect();
1047
1048        fields
1049            .into_iter()
1050            .map(|field| {
1051                let order = order_by_field.get(&field).copied().unwrap_or(0);
1052                json!({
1053                    "id": field,
1054                    "name": field,
1055                    "order": order,
1056                })
1057            })
1058            .collect()
1059    }
1060}
1061
1062#[derive(Debug, Clone)]
1063struct NetdataRequest {
1064    info: bool,
1065    echo: Value,
1066    after_realtime_usec: Option<u64>,
1067    before_realtime_usec: Option<u64>,
1068    if_modified_since_usec: u64,
1069    anchor: ExplorerAnchor,
1070    direction: Direction,
1071    limit: usize,
1072    data_only: bool,
1073    delta: bool,
1074    tail: bool,
1075    sampling: u64,
1076    source_type: u64,
1077    exact_sources: Vec<String>,
1078    filters: Vec<ExplorerFilter>,
1079    facets: Vec<Vec<u8>>,
1080    histogram: Option<String>,
1081    fts_terms: Vec<ExplorerFtsPattern>,
1082    fts_patterns: Vec<Vec<u8>>,
1083    fts_negative_patterns: Vec<Vec<u8>>,
1084}
1085
1086impl NetdataRequest {
1087    fn parse(value: &Value, config: &NetdataFunctionConfig) -> Result<Self> {
1088        let object = value.as_object().ok_or_else(|| {
1089            SdkError::InvalidPath("Netdata function request must be a JSON object".to_string())
1090        })?;
1091        let now_seconds = unix_now_seconds();
1092        let info = get_bool(object, "info").unwrap_or(false);
1093        let after = get_i64(object, "after");
1094        let before = get_i64(object, "before");
1095        let (after_realtime_usec, before_realtime_usec) =
1096            normalize_time_window(now_seconds, after, before);
1097        let direction = request_direction(object);
1098        let if_modified_since_usec = get_u64(object, "if_modified_since").unwrap_or_default();
1099        let data_only = get_bool(object, "data_only").unwrap_or(false);
1100        let delta = request_delta(data_only, object);
1101        let tail = request_tail(data_only, if_modified_since_usec, object);
1102        let sampling = get_u64(object, "sampling").unwrap_or(DEFAULT_ITEMS_SAMPLING);
1103        let (anchor, direction) = request_anchor_and_direction(
1104            object,
1105            tail,
1106            direction,
1107            after_realtime_usec,
1108            before_realtime_usec,
1109        );
1110        let requested_limit = request_limit(object);
1111        let limit = requested_limit.max(2);
1112        let requested_facets = parse_string_array(object.get("facets"));
1113        let facets = request_facets(&requested_facets, config);
1114        let requested_histogram = request_histogram(object);
1115        let histogram = request_histogram_or_default(&requested_histogram, config);
1116        let requested_query = request_query(object);
1117        let (fts_terms, fts_patterns, fts_negative_patterns) = requested_query
1118            .as_deref()
1119            .map(parse_fts_query_patterns)
1120            .unwrap_or_default();
1121        let source_selection = parse_source_selection(object.get("selections"));
1122        let filters = parse_filters(object.get("selections"));
1123
1124        let echo_input = RequestEchoInput {
1125            info,
1126            after_realtime_usec,
1127            before_realtime_usec,
1128            if_modified_since_usec,
1129            anchor,
1130            direction,
1131            limit: requested_limit,
1132            data_only,
1133            delta,
1134            tail,
1135            sampling,
1136            source_type: source_selection.source_type,
1137            requested_facets: requested_facets.as_deref(),
1138            selections: object.get("selections"),
1139            histogram: requested_histogram.as_deref(),
1140            query: requested_query.as_deref(),
1141        };
1142        let echo = normalized_request_echo(&echo_input);
1143
1144        Ok(Self {
1145            info,
1146            echo,
1147            after_realtime_usec,
1148            before_realtime_usec,
1149            if_modified_since_usec,
1150            anchor,
1151            direction,
1152            limit,
1153            data_only,
1154            delta,
1155            tail,
1156            sampling,
1157            source_type: source_selection.source_type,
1158            exact_sources: source_selection.exact_sources,
1159            filters,
1160            facets,
1161            histogram,
1162            fts_terms,
1163            fts_patterns,
1164            fts_negative_patterns,
1165        })
1166    }
1167
1168    fn matches_source(&self, path: &Path, metadata: Option<&NetdataJournalFileMetadata>) -> bool {
1169        if self.source_type == SOURCE_TYPE_ALL && self.exact_sources.is_empty() {
1170            return true;
1171        }
1172        if self.source_type & SOURCE_TYPE_ALL != 0 {
1173            return true;
1174        }
1175        let file_source_type = metadata
1176            .and_then(|metadata| metadata.source_type)
1177            .unwrap_or_else(|| journal_file_source_type(path));
1178        if file_source_type & self.source_type != 0 {
1179            return true;
1180        }
1181        if self.exact_sources.is_empty() {
1182            return false;
1183        }
1184        let source_name = metadata
1185            .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1186            .or_else(|| journal_file_exact_source_name(path));
1187        self.exact_sources
1188            .iter()
1189            .any(|source| source_name.as_deref() == Some(source.as_str()))
1190    }
1191
1192    fn to_explorer_query(
1193        &self,
1194        matched_files: u64,
1195        file_header: Option<FileHeader>,
1196        realtime_slack_usec: u64,
1197    ) -> ExplorerQuery {
1198        let analysis_enabled = !self.data_only || self.delta;
1199        let tail_anchor = self.tail && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1200        let sampling = (analysis_enabled
1201            && self.sampling != 0
1202            && matched_files != 0
1203            && self.after_realtime_usec.is_some()
1204            && self.before_realtime_usec.is_some())
1205        .then(|| {
1206            let header = file_header.unwrap_or(FileHeader {
1207                signature: [0; 8],
1208                compatible_flags: 0,
1209                incompatible_flags: 0,
1210                state: 0,
1211                header_size: 0,
1212                n_entries: 0,
1213                head_entry_realtime: 0,
1214                tail_entry_realtime: 0,
1215                head_entry_seqnum: 0,
1216                tail_entry_seqnum: 0,
1217                tail_entry_boot_id: [0; 16],
1218                seqnum_id: [0; 16],
1219            });
1220            let messages_in_file = header
1221                .tail_entry_seqnum
1222                .checked_sub(header.head_entry_seqnum)
1223                .and_then(|span| span.checked_add(1))
1224                .filter(|_| header.head_entry_seqnum != 0 && header.tail_entry_seqnum != 0)
1225                .unwrap_or(header.n_entries);
1226            ExplorerSampling {
1227                budget: self.sampling,
1228                matched_files,
1229                file_head_realtime_usec: header.head_entry_realtime,
1230                file_tail_realtime_usec: header.tail_entry_realtime,
1231                file_head_seqnum: header.head_entry_seqnum,
1232                file_tail_seqnum: header.tail_entry_seqnum,
1233                file_entries: messages_in_file,
1234            }
1235        });
1236        ExplorerQuery {
1237            after_realtime_usec: self.after_realtime_usec,
1238            before_realtime_usec: self.before_realtime_usec,
1239            anchor: self.anchor,
1240            direction: self.direction,
1241            limit: self.limit,
1242            filters: self.filters.clone(),
1243            facets: analysis_enabled
1244                .then(|| self.facets.clone())
1245                .unwrap_or_default(),
1246            histogram: analysis_enabled
1247                .then(|| {
1248                    self.histogram
1249                        .as_ref()
1250                        .map(|field| field.as_bytes().to_vec())
1251                })
1252                .flatten(),
1253            histogram_target_buckets: DEFAULT_HISTOGRAM_BUCKETS,
1254            fts_terms: self.fts_terms.clone(),
1255            fts_patterns: self.fts_patterns.clone(),
1256            fts_negative_patterns: self.fts_negative_patterns.clone(),
1257            field_mode: ExplorerFieldMode::FirstValue,
1258            exclude_facet_field_filters: self.distinct_filter_fields() > 1,
1259            use_source_realtime: true,
1260            realtime_slack_usec: normalize_journal_vs_realtime_delta_usec(realtime_slack_usec),
1261            stop_when_rows_full: self.data_only && !tail_anchor,
1262            stop_when_rows_full_check_every: DATA_ONLY_CHECK_EVERY_ROWS,
1263            sampling,
1264            debug_collect_column_fields_by_row_traversal: false,
1265        }
1266    }
1267
1268    fn file_query(
1269        &self,
1270        matched_files: usize,
1271        file_header: FileHeader,
1272        order: &JournalFileOrderInfo,
1273    ) -> ExplorerQuery {
1274        let mut query = self.to_explorer_query(
1275            matched_files as u64,
1276            Some(file_header),
1277            order.journal_vs_realtime_delta_usec,
1278        );
1279        if self.data_only && self.delta {
1280            query.stop_when_rows_full = false;
1281        }
1282        query
1283    }
1284
1285    fn unfiltered_vocabulary(&self) -> Self {
1286        let mut request = self.clone();
1287        request.filters.clear();
1288        request.histogram = None;
1289        request.limit = 0;
1290        request.fts_terms.clear();
1291        request.fts_patterns.clear();
1292        request.fts_negative_patterns.clear();
1293        request
1294    }
1295
1296    fn distinct_filter_fields(&self) -> usize {
1297        self.filters
1298            .iter()
1299            .map(|filter| filter.field.as_slice())
1300            .collect::<HashSet<_>>()
1301            .len()
1302    }
1303}
1304
1305#[derive(Debug, Clone)]
1306struct LocatedRow {
1307    file_path: PathBuf,
1308    row: ExplorerRow,
1309}
1310
1311#[derive(Debug)]
1312struct NetdataRealtimeAdjuster {
1313    direction: Direction,
1314    last_realtime_from: u64,
1315    last_realtime_to: u64,
1316}
1317
1318impl NetdataRealtimeAdjuster {
1319    fn new(direction: Direction) -> Self {
1320        Self {
1321            direction,
1322            last_realtime_from: 0,
1323            last_realtime_to: 0,
1324        }
1325    }
1326
1327    fn adjust(&mut self, realtime_usec: u64) -> u64 {
1328        match self.direction {
1329            Direction::Backward => {
1330                if realtime_usec >= self.last_realtime_from
1331                    && realtime_usec <= self.last_realtime_to
1332                {
1333                    self.last_realtime_from = self.last_realtime_from.saturating_sub(1);
1334                    self.last_realtime_from
1335                } else {
1336                    self.last_realtime_from = realtime_usec;
1337                    self.last_realtime_to = realtime_usec;
1338                    realtime_usec
1339                }
1340            }
1341            Direction::Forward => {
1342                if realtime_usec >= self.last_realtime_from
1343                    && realtime_usec <= self.last_realtime_to
1344                {
1345                    self.last_realtime_to = self.last_realtime_to.saturating_add(1);
1346                    self.last_realtime_to
1347                } else {
1348                    self.last_realtime_from = realtime_usec;
1349                    self.last_realtime_to = realtime_usec;
1350                    realtime_usec
1351                }
1352            }
1353        }
1354    }
1355}
1356
1357#[derive(Debug, Default)]
1358struct JournalSourceSummary {
1359    files: u64,
1360    total_size: u64,
1361    first_realtime_usec: Option<u64>,
1362    last_realtime_usec: Option<u64>,
1363}
1364
1365impl JournalSourceSummary {
1366    #[cfg(test)]
1367    fn from_paths(
1368        paths: &[PathBuf],
1369        reader_options: ReaderOptions,
1370        options: &NetdataFunctionRunOptions<'_>,
1371    ) -> Self {
1372        let mut summary = Self::default();
1373        for path in paths {
1374            let metadata = file_metadata(options, path);
1375            summary.add_path(path, reader_options, metadata.as_ref());
1376        }
1377        summary
1378    }
1379
1380    fn add_path(
1381        &mut self,
1382        path: &Path,
1383        reader_options: ReaderOptions,
1384        metadata: Option<&NetdataJournalFileMetadata>,
1385    ) {
1386        if let Ok(metadata) = std::fs::metadata(path) {
1387            self.files = self.files.saturating_add(1);
1388            self.total_size = self.total_size.saturating_add(metadata.len());
1389        }
1390        if let Some(metadata) = metadata {
1391            let metadata_first = metadata.msg_first_realtime_usec;
1392            let metadata_last = metadata.msg_last_realtime_usec;
1393            if let Some(first) = metadata_first {
1394                self.first_realtime_usec = Some(
1395                    self.first_realtime_usec
1396                        .map_or(first, |current| current.min(first)),
1397                );
1398            }
1399            if let Some(last) = metadata_last {
1400                self.last_realtime_usec = Some(
1401                    self.last_realtime_usec
1402                        .map_or(last, |current| current.max(last)),
1403                );
1404            }
1405            if metadata_first.is_some() && metadata_last.is_some() {
1406                return;
1407            }
1408        }
1409        let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
1410            return;
1411        };
1412        let header = reader.header();
1413        if header.head_entry_realtime != 0 {
1414            self.first_realtime_usec = Some(
1415                self.first_realtime_usec
1416                    .map_or(header.head_entry_realtime, |current| {
1417                        current.min(header.head_entry_realtime)
1418                    }),
1419            );
1420        }
1421        if header.tail_entry_realtime != 0 {
1422            self.last_realtime_usec = Some(
1423                self.last_realtime_usec
1424                    .map_or(header.tail_entry_realtime, |current| {
1425                        current.max(header.tail_entry_realtime)
1426                    }),
1427            );
1428        }
1429    }
1430
1431    fn info(&self) -> String {
1432        let coverage = match (self.first_realtime_usec, self.last_realtime_usec) {
1433            (Some(first), Some(last)) if last >= first => {
1434                human_duration_seconds((last - first) / 1_000_000)
1435            }
1436            _ => "0s".to_string(),
1437        };
1438        let last_entry = self
1439            .last_realtime_usec
1440            .and_then(|usec| DateTime::<Utc>::from_timestamp((usec / 1_000_000) as i64, 0))
1441            .map(|datetime| datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1442            .unwrap_or_else(|| "unknown".to_string());
1443        format!(
1444            "{} files, total size {}, covering {}, last entry at {}",
1445            self.files,
1446            human_binary_size(self.total_size),
1447            coverage,
1448            last_entry
1449        )
1450    }
1451}
1452
1453fn push_source_option(target: &mut Vec<Value>, id: &str, summary: &JournalSourceSummary) {
1454    if summary.files == 0 {
1455        return;
1456    }
1457    target.push(json!({
1458        "id": id,
1459        "name": id,
1460        "info": summary.info(),
1461        "pill": human_binary_size(summary.total_size),
1462    }));
1463}
1464
1465fn expand_located_row_payloads(
1466    located: &mut LocatedRow,
1467    reader_options: ReaderOptions,
1468) -> Result<()> {
1469    let mut reader = FileReader::open_with_options(&located.file_path, reader_options)?;
1470    reader.seek_cursor(&located.row.cursor)?;
1471    if !reader.test_cursor(&located.row.cursor)? {
1472        return Err(SdkError::InvalidCursor(format!(
1473            "selected row cursor is no longer available: {}",
1474            located.row.cursor
1475        )));
1476    }
1477    reader.collect_entry_payloads(&mut located.row.payloads)
1478}
1479
1480#[derive(Debug, Default)]
1481struct CombinedResult {
1482    rows: Vec<LocatedRow>,
1483    facets: BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1484    histogram: Option<ExplorerHistogram>,
1485    column_fields: BTreeSet<String>,
1486    stats: ExplorerStats,
1487    page_counters: Option<NetdataPageCounters>,
1488    matched_files: u64,
1489    matched_paths: Vec<PathBuf>,
1490    skipped_files: u64,
1491    file_errors: Vec<String>,
1492    partial: bool,
1493    timed_out: bool,
1494    cancelled: bool,
1495    sampling_enabled: bool,
1496}
1497
1498#[derive(Clone, Copy, Debug, Default)]
1499struct NetdataPageCounters {
1500    matched: u64,
1501    before: u64,
1502    after: u64,
1503}
1504
1505#[derive(Clone, Copy)]
1506struct ProgressContext {
1507    current_file: usize,
1508    total_files: usize,
1509    started: Instant,
1510}
1511
1512#[derive(Debug)]
1513enum NetdataPageHeap {
1514    Backward(BinaryHeap<Reverse<u64>>),
1515    Forward(BinaryHeap<u64>),
1516}
1517
1518#[derive(Debug)]
1519struct NetdataPageWindow {
1520    direction: Direction,
1521    anchor_start_usec: Option<u64>,
1522    anchor_stop_usec: Option<u64>,
1523    limit: usize,
1524    heap: NetdataPageHeap,
1525    oldest_retained_usec: Option<u64>,
1526    newest_retained_usec: Option<u64>,
1527    matched: u64,
1528    skips_before: u64,
1529    skips_after: u64,
1530    shifts: u64,
1531}
1532
1533impl NetdataPageWindow {
1534    fn for_request(request: &NetdataRequest) -> Self {
1535        let anchor_start_usec = match request.anchor {
1536            ExplorerAnchor::Realtime(anchor) => Some(anchor),
1537            _ => None,
1538        };
1539        let heap = match request.direction {
1540            Direction::Backward => NetdataPageHeap::Backward(BinaryHeap::new()),
1541            Direction::Forward => NetdataPageHeap::Forward(BinaryHeap::new()),
1542        };
1543        Self {
1544            direction: request.direction,
1545            anchor_start_usec,
1546            anchor_stop_usec: None,
1547            limit: request.limit,
1548            heap,
1549            oldest_retained_usec: None,
1550            newest_retained_usec: None,
1551            matched: 0,
1552            skips_before: 0,
1553            skips_after: 0,
1554            shifts: 0,
1555        }
1556    }
1557
1558    fn candidate_to_keep(&self, realtime_usec: u64) -> bool {
1559        if self.limit == 0 || !self.entry_within_anchor_readonly(realtime_usec) {
1560            return false;
1561        }
1562        if self.retained_len() < self.limit {
1563            return true;
1564        }
1565        self.oldest_retained_usec
1566            .zip(self.newest_retained_usec)
1567            .is_some_and(|(oldest, newest)| realtime_usec >= oldest && realtime_usec <= newest)
1568    }
1569
1570    fn observe(&mut self, realtime_usec: u64) {
1571        if !self.entry_within_anchor(realtime_usec) || self.limit == 0 {
1572            return;
1573        }
1574        self.matched = self.matched.saturating_add(1);
1575        match (&mut self.heap, self.direction) {
1576            (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1577                if heap.len() < self.limit {
1578                    heap.push(Reverse(realtime_usec));
1579                    self.add_retained_bound(realtime_usec);
1580                    return;
1581                }
1582                let Some(Reverse(oldest)) = heap.peek().copied() else {
1583                    heap.push(Reverse(realtime_usec));
1584                    self.refresh_retained_bounds();
1585                    return;
1586                };
1587                if realtime_usec < oldest {
1588                    self.skips_after = self.skips_after.saturating_add(1);
1589                    return;
1590                }
1591                heap.pop();
1592                heap.push(Reverse(realtime_usec));
1593                self.refresh_retained_bounds();
1594                self.shifts = self.shifts.saturating_add(1);
1595            }
1596            (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1597                if heap.len() < self.limit {
1598                    heap.push(realtime_usec);
1599                    self.add_retained_bound(realtime_usec);
1600                    return;
1601                }
1602                let Some(newest) = heap.peek().copied() else {
1603                    heap.push(realtime_usec);
1604                    self.refresh_retained_bounds();
1605                    return;
1606                };
1607                if realtime_usec > newest {
1608                    self.skips_before = self.skips_before.saturating_add(1);
1609                    return;
1610                }
1611                heap.pop();
1612                heap.push(realtime_usec);
1613                self.refresh_retained_bounds();
1614                self.shifts = self.shifts.saturating_add(1);
1615            }
1616            _ => {}
1617        }
1618    }
1619
1620    fn retained_len(&self) -> usize {
1621        match &self.heap {
1622            NetdataPageHeap::Backward(heap) => heap.len(),
1623            NetdataPageHeap::Forward(heap) => heap.len(),
1624        }
1625    }
1626
1627    fn add_retained_bound(&mut self, realtime_usec: u64) {
1628        self.oldest_retained_usec = Some(
1629            self.oldest_retained_usec
1630                .map_or(realtime_usec, |current| current.min(realtime_usec)),
1631        );
1632        self.newest_retained_usec = Some(
1633            self.newest_retained_usec
1634                .map_or(realtime_usec, |current| current.max(realtime_usec)),
1635        );
1636    }
1637
1638    fn refresh_retained_bounds(&mut self) {
1639        let (oldest, newest) = match &self.heap {
1640            NetdataPageHeap::Backward(heap) => heap
1641                .iter()
1642                .map(|Reverse(usec)| *usec)
1643                .fold((None, None), retained_bounds_fold),
1644            NetdataPageHeap::Forward(heap) => heap
1645                .iter()
1646                .copied()
1647                .fold((None, None), retained_bounds_fold),
1648        };
1649        self.oldest_retained_usec = oldest;
1650        self.newest_retained_usec = newest;
1651    }
1652
1653    fn entry_within_anchor(&mut self, realtime_usec: u64) -> bool {
1654        match self.direction {
1655            Direction::Backward => {
1656                if self
1657                    .anchor_start_usec
1658                    .is_some_and(|anchor| realtime_usec >= anchor)
1659                {
1660                    self.skips_before = self.skips_before.saturating_add(1);
1661                    return false;
1662                }
1663                if self
1664                    .anchor_stop_usec
1665                    .is_some_and(|anchor| realtime_usec <= anchor)
1666                {
1667                    self.skips_after = self.skips_after.saturating_add(1);
1668                    return false;
1669                }
1670            }
1671            Direction::Forward => {
1672                if self
1673                    .anchor_start_usec
1674                    .is_some_and(|anchor| realtime_usec <= anchor)
1675                {
1676                    self.skips_after = self.skips_after.saturating_add(1);
1677                    return false;
1678                }
1679                if self
1680                    .anchor_stop_usec
1681                    .is_some_and(|anchor| realtime_usec >= anchor)
1682                {
1683                    self.skips_before = self.skips_before.saturating_add(1);
1684                    return false;
1685                }
1686            }
1687        }
1688        true
1689    }
1690
1691    fn entry_within_anchor_readonly(&self, realtime_usec: u64) -> bool {
1692        match self.direction {
1693            Direction::Backward => {
1694                if self
1695                    .anchor_start_usec
1696                    .is_some_and(|anchor| realtime_usec >= anchor)
1697                {
1698                    return false;
1699                }
1700                if self
1701                    .anchor_stop_usec
1702                    .is_some_and(|anchor| realtime_usec <= anchor)
1703                {
1704                    return false;
1705                }
1706            }
1707            Direction::Forward => {
1708                if self
1709                    .anchor_start_usec
1710                    .is_some_and(|anchor| realtime_usec <= anchor)
1711                {
1712                    return false;
1713                }
1714                if self
1715                    .anchor_stop_usec
1716                    .is_some_and(|anchor| realtime_usec >= anchor)
1717                {
1718                    return false;
1719                }
1720            }
1721        }
1722        true
1723    }
1724
1725    fn counters(&self) -> NetdataPageCounters {
1726        NetdataPageCounters {
1727            matched: self.matched,
1728            before: self.skips_before,
1729            after: self.skips_after.saturating_add(self.shifts),
1730        }
1731    }
1732
1733    fn can_stop_delta_file(&self, realtime_usec: u64, slack_usec: u64) -> bool {
1734        if self.limit == 0 {
1735            return false;
1736        }
1737        match (&self.heap, self.direction) {
1738            (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1739                if heap.len() < self.limit {
1740                    return false;
1741                }
1742                heap.peek().is_some_and(|Reverse(oldest)| {
1743                    realtime_usec < oldest.saturating_sub(slack_usec)
1744                })
1745            }
1746            (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1747                if heap.len() < self.limit {
1748                    return false;
1749                }
1750                heap.peek()
1751                    .is_some_and(|newest| realtime_usec > newest.saturating_add(slack_usec))
1752            }
1753            _ => false,
1754        }
1755    }
1756}
1757
1758fn retained_bounds_fold(
1759    (oldest, newest): (Option<u64>, Option<u64>),
1760    realtime_usec: u64,
1761) -> (Option<u64>, Option<u64>) {
1762    (
1763        Some(oldest.map_or(realtime_usec, |current| current.min(realtime_usec))),
1764        Some(newest.map_or(realtime_usec, |current| current.max(realtime_usec))),
1765    )
1766}
1767
1768impl CombinedResult {
1769    fn merge(
1770        &mut self,
1771        path: &Path,
1772        result: ExplorerResult,
1773        direction: Direction,
1774        limit: usize,
1775    ) -> Result<()> {
1776        let ExplorerResult {
1777            rows,
1778            facets,
1779            histogram,
1780            stats,
1781            column_fields,
1782            ..
1783        } = result;
1784
1785        if let Some(histogram) = histogram {
1786            merge_histogram(&mut self.histogram, histogram)?;
1787        }
1788
1789        self.merge_stats(stats);
1790        for row in rows {
1791            self.rows.push(LocatedRow {
1792                file_path: path.to_path_buf(),
1793                row,
1794            });
1795        }
1796        for field in column_fields {
1797            if let Ok(field) = String::from_utf8(field) {
1798                self.column_fields.insert(field);
1799            }
1800        }
1801        for (field, values) in facets {
1802            let target = self.facets.entry(field).or_default();
1803            for (value, count) in values {
1804                add_netdata_facet_count(target, &value, count);
1805            }
1806        }
1807        self.sort_and_limit(direction, limit);
1808        Ok(())
1809    }
1810
1811    fn add_column_fields<I>(&mut self, fields: I)
1812    where
1813        I: IntoIterator<Item = String>,
1814    {
1815        self.column_fields.extend(fields);
1816    }
1817
1818    fn sort_and_limit(&mut self, direction: Direction, limit: usize) {
1819        match direction {
1820            Direction::Forward => self.rows.sort_by_key(|row| row.row.realtime_usec),
1821            Direction::Backward => self
1822                .rows
1823                .sort_by(|left, right| right.row.realtime_usec.cmp(&left.row.realtime_usec)),
1824        }
1825        make_row_timestamps_unique(&mut self.rows, direction);
1826        if self.rows.len() > limit {
1827            self.rows.truncate(limit);
1828        }
1829        self.stats.rows_returned = self.rows.len() as u64;
1830    }
1831
1832    fn expand_row_payloads(&mut self, reader_options: ReaderOptions) {
1833        if self.rows.is_empty() {
1834            self.stats.rows_returned = 0;
1835            return;
1836        }
1837
1838        let mut rows = Vec::with_capacity(self.rows.len());
1839        for mut located in self.rows.drain(..) {
1840            if !located.row.payloads.is_empty() {
1841                rows.push(located);
1842                continue;
1843            }
1844            match expand_located_row_payloads(&mut located, reader_options) {
1845                Ok(()) => {
1846                    self.stats.returned_row_expansions =
1847                        self.stats.returned_row_expansions.saturating_add(1);
1848                    rows.push(located);
1849                }
1850                Err(err) => {
1851                    self.partial = true;
1852                    self.file_errors.push(format!(
1853                        "{} cursor {}: {err}",
1854                        located.file_path.display(),
1855                        located.row.cursor
1856                    ));
1857                }
1858            }
1859        }
1860        self.rows = rows;
1861        self.stats.rows_returned = self.rows.len() as u64;
1862    }
1863
1864    fn merge_stats(&mut self, stats: ExplorerStats) {
1865        self.stats.rows_examined = self.stats.rows_examined.saturating_add(stats.rows_examined);
1866        self.stats.rows_matched = self.stats.rows_matched.saturating_add(stats.rows_matched);
1867        self.stats.facet_rows_matched = self
1868            .stats
1869            .facet_rows_matched
1870            .saturating_add(stats.facet_rows_matched);
1871        self.stats.rows_returned = self.stats.rows_returned.saturating_add(stats.rows_returned);
1872        self.stats.rows_unsampled = self
1873            .stats
1874            .rows_unsampled
1875            .saturating_add(stats.rows_unsampled);
1876        self.stats.rows_estimated = self
1877            .stats
1878            .rows_estimated
1879            .saturating_add(stats.rows_estimated);
1880        self.stats.sampling_sampled = self
1881            .stats
1882            .sampling_sampled
1883            .saturating_add(stats.sampling_sampled);
1884        self.stats.sampling_unsampled = self
1885            .stats
1886            .sampling_unsampled
1887            .saturating_add(stats.sampling_unsampled);
1888        self.stats.sampling_estimated = self
1889            .stats
1890            .sampling_estimated
1891            .saturating_add(stats.sampling_estimated);
1892        if stats.last_realtime_usec > self.stats.last_realtime_usec {
1893            self.stats.last_realtime_usec = stats.last_realtime_usec;
1894        }
1895        if stats.max_source_realtime_delta_usec > self.stats.max_source_realtime_delta_usec {
1896            self.stats.max_source_realtime_delta_usec = stats.max_source_realtime_delta_usec;
1897        }
1898        self.stats.data_refs_seen = self
1899            .stats
1900            .data_refs_seen
1901            .saturating_add(stats.data_refs_seen);
1902        self.stats.data_refs_skipped = self
1903            .stats
1904            .data_refs_skipped
1905            .saturating_add(stats.data_refs_skipped);
1906        self.stats.data_payloads_loaded = self
1907            .stats
1908            .data_payloads_loaded
1909            .saturating_add(stats.data_payloads_loaded);
1910        self.stats.data_objects_classified = self
1911            .stats
1912            .data_objects_classified
1913            .saturating_add(stats.data_objects_classified);
1914        self.stats.data_cache_hits = self
1915            .stats
1916            .data_cache_hits
1917            .saturating_add(stats.data_cache_hits);
1918        self.stats.data_cache_misses = self
1919            .stats
1920            .data_cache_misses
1921            .saturating_add(stats.data_cache_misses);
1922        self.stats.payloads_decompressed = self
1923            .stats
1924            .payloads_decompressed
1925            .saturating_add(stats.payloads_decompressed);
1926        self.stats.fts_scans = self.stats.fts_scans.saturating_add(stats.fts_scans);
1927        self.stats.facet_updates = self.stats.facet_updates.saturating_add(stats.facet_updates);
1928        self.stats.histogram_updates = self
1929            .stats
1930            .histogram_updates
1931            .saturating_add(stats.histogram_updates);
1932        self.stats.returned_row_expansions = self
1933            .stats
1934            .returned_row_expansions
1935            .saturating_add(stats.returned_row_expansions);
1936        self.stats.early_stop_opportunities = self
1937            .stats
1938            .early_stop_opportunities
1939            .saturating_add(stats.early_stop_opportunities);
1940        self.stats.early_stops = self.stats.early_stops.saturating_add(stats.early_stops);
1941    }
1942
1943    fn add_zero_count_facet_values(
1944        &mut self,
1945        vocabulary: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1946    ) {
1947        for (field, values) in vocabulary {
1948            let target = self.facets.entry(field.clone()).or_default();
1949            for value in values.keys() {
1950                add_netdata_facet_count(target, value, 0);
1951            }
1952        }
1953    }
1954
1955    fn add_zero_count_facet_values_from_files(
1956        &mut self,
1957        fields: &[Vec<u8>],
1958        reader_options: ReaderOptions,
1959    ) {
1960        for path in &self.matched_paths {
1961            let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
1962                continue;
1963            };
1964            for field in fields {
1965                let Ok(field_name) = std::str::from_utf8(field) else {
1966                    continue;
1967                };
1968                let Ok(values) = reader.query_unique(field_name) else {
1969                    continue;
1970                };
1971                if values.is_empty() {
1972                    continue;
1973                }
1974                let target = self.facets.entry(field.clone()).or_default();
1975                for value in values {
1976                    add_netdata_facet_count(target, &value, 0);
1977                }
1978            }
1979        }
1980    }
1981
1982    fn add_zero_count_selected_filter_values(&mut self, request: &NetdataRequest) {
1983        let mut report_fields: HashSet<Vec<u8>> = request.facets.iter().cloned().collect();
1984        if let Some(histogram) = &request.histogram {
1985            report_fields.insert(histogram.as_bytes().to_vec());
1986        }
1987        for filter in &request.filters {
1988            if !report_fields.contains(&filter.field) {
1989                continue;
1990            }
1991            let target = self.facets.entry(filter.field.clone()).or_default();
1992            for value in &filter.values {
1993                add_netdata_facet_count(target, value, 0);
1994            }
1995        }
1996    }
1997
1998    fn reportable_facet_fields(&self, requested: &[Vec<u8>]) -> Vec<String> {
1999        string_fields(&self.reportable_facet_fields_bytes(requested))
2000    }
2001
2002    fn reportable_facet_fields_bytes(&self, requested: &[Vec<u8>]) -> Vec<Vec<u8>> {
2003        requested
2004            .iter()
2005            .filter(|field| {
2006                self.facets
2007                    .get(*field)
2008                    .is_some_and(facet_group_is_reportable)
2009            })
2010            .cloned()
2011            .collect()
2012    }
2013}
2014
2015fn netdata_function_error(status: u64, message: &str) -> Value {
2016    json!({
2017        "status": status,
2018        "errorMessage": message,
2019    })
2020}
2021
2022fn query_message(timed_out: bool, stats: &ExplorerStats) -> Value {
2023    if !timed_out && stats.rows_unsampled == 0 && stats.rows_estimated == 0 {
2024        return Value::String("OK".to_string());
2025    }
2026
2027    let total = stats
2028        .rows_examined
2029        .saturating_add(stats.rows_unsampled)
2030        .saturating_add(stats.rows_estimated)
2031        .max(1);
2032    let real_percent = stats.rows_examined as f64 * 100.0 / total as f64;
2033    let unsampled_percent = stats.rows_unsampled as f64 * 100.0 / total as f64;
2034    let estimated_percent = stats.rows_estimated as f64 * 100.0 / total as f64;
2035
2036    let mut title = String::new();
2037    let mut description = String::new();
2038    let mut status = "notice";
2039    if timed_out {
2040        title.push_str("Query timed-out, incomplete data. ");
2041        description.push_str(
2042            "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ",
2043        );
2044        status = "warning";
2045    }
2046    if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2047        title.push_str(&format!("{real_percent:.2}% real data"));
2048        description.push_str(&format!(
2049            "ACTUAL DATA: The filters counters reflect {real_percent:.2}% of the data. "
2050        ));
2051    }
2052    if stats.rows_unsampled != 0 {
2053        title.push_str(&format!(", {unsampled_percent:.2}% unsampled"));
2054        description.push_str(&format!(
2055            "UNSAMPLED DATA: {unsampled_percent:.2}% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
2056        ));
2057    }
2058    if stats.rows_estimated != 0 {
2059        title.push_str(&format!(", {estimated_percent:.2}% estimated"));
2060        description.push_str(&format!(
2061            "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by {estimated_percent:.2}%. "
2062        ));
2063    }
2064
2065    json!({
2066        "title": title,
2067        "status": status,
2068        "description": description,
2069    })
2070}
2071
2072fn merged_progress_stats(completed: &ExplorerStats, current: &ExplorerStats) -> ExplorerStats {
2073    let mut stats = completed.clone();
2074    stats.rows_examined = stats.rows_examined.saturating_add(current.rows_examined);
2075    stats.rows_matched = stats.rows_matched.saturating_add(current.rows_matched);
2076    stats.facet_rows_matched = stats
2077        .facet_rows_matched
2078        .saturating_add(current.facet_rows_matched);
2079    stats.rows_returned = stats.rows_returned.saturating_add(current.rows_returned);
2080    stats.rows_unsampled = stats.rows_unsampled.saturating_add(current.rows_unsampled);
2081    stats.rows_estimated = stats.rows_estimated.saturating_add(current.rows_estimated);
2082    stats.sampling_sampled = stats
2083        .sampling_sampled
2084        .saturating_add(current.sampling_sampled);
2085    stats.sampling_unsampled = stats
2086        .sampling_unsampled
2087        .saturating_add(current.sampling_unsampled);
2088    stats.sampling_estimated = stats
2089        .sampling_estimated
2090        .saturating_add(current.sampling_estimated);
2091    if current.last_realtime_usec > stats.last_realtime_usec {
2092        stats.last_realtime_usec = current.last_realtime_usec;
2093    }
2094    if current.max_source_realtime_delta_usec > stats.max_source_realtime_delta_usec {
2095        stats.max_source_realtime_delta_usec = current.max_source_realtime_delta_usec;
2096    }
2097    stats.data_refs_seen = stats.data_refs_seen.saturating_add(current.data_refs_seen);
2098    stats.data_refs_skipped = stats
2099        .data_refs_skipped
2100        .saturating_add(current.data_refs_skipped);
2101    stats.data_payloads_loaded = stats
2102        .data_payloads_loaded
2103        .saturating_add(current.data_payloads_loaded);
2104    stats.data_objects_classified = stats
2105        .data_objects_classified
2106        .saturating_add(current.data_objects_classified);
2107    stats.data_cache_hits = stats
2108        .data_cache_hits
2109        .saturating_add(current.data_cache_hits);
2110    stats.data_cache_misses = stats
2111        .data_cache_misses
2112        .saturating_add(current.data_cache_misses);
2113    stats.payloads_decompressed = stats
2114        .payloads_decompressed
2115        .saturating_add(current.payloads_decompressed);
2116    stats.fts_scans = stats.fts_scans.saturating_add(current.fts_scans);
2117    stats.facet_updates = stats.facet_updates.saturating_add(current.facet_updates);
2118    stats.histogram_updates = stats
2119        .histogram_updates
2120        .saturating_add(current.histogram_updates);
2121    stats.returned_row_expansions = stats
2122        .returned_row_expansions
2123        .saturating_add(current.returned_row_expansions);
2124    stats.early_stop_opportunities = stats
2125        .early_stop_opportunities
2126        .saturating_add(current.early_stop_opportunities);
2127    stats.early_stops = stats.early_stops.saturating_add(current.early_stops);
2128    stats
2129}
2130
2131struct Columns {
2132    order: Vec<String>,
2133    map: Map<String, Value>,
2134}
2135
2136struct QueryResponseArtifacts {
2137    reportable_facet_field_names: Vec<String>,
2138    columns: Columns,
2139    data: Vec<Value>,
2140    facets: Value,
2141    histogram: Option<Value>,
2142    message: Value,
2143    items: Value,
2144}
2145
2146fn base_query_response(
2147    request: &NetdataRequest,
2148    combined: &CombinedResult,
2149    artifacts: &QueryResponseArtifacts,
2150) -> Value {
2151    json!({
2152        "_request": &request.echo,
2153        "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
2154        "_journal_files": {
2155            "matched": combined.matched_files,
2156            "skipped": combined.skipped_files,
2157            "errors": &combined.file_errors,
2158        },
2159        "status": 200,
2160        "partial": combined.partial,
2161        "type": "table",
2162        "show_ids": true,
2163        "has_history": true,
2164        "pagination": {
2165            "enabled": true,
2166            "key": "anchor",
2167            "column": "timestamp",
2168            "units": "timestamp_usec",
2169        },
2170        "columns": &artifacts.columns.map,
2171        "data": &artifacts.data,
2172        "_stats": {
2173            "sdk_explorer": &combined.stats,
2174        },
2175        "expires": if request.data_only {
2176            unix_now_seconds().saturating_add(3600)
2177        } else {
2178            0
2179        }
2180    })
2181}
2182
2183fn response_items(request: &NetdataRequest, combined: &CombinedResult, returned: u64) -> Value {
2184    let unsampled = combined.stats.rows_unsampled;
2185    let estimated = combined.stats.rows_estimated;
2186    let fallback_rows_after_returned =
2187        response_fallback_rows_after_returned(&combined.stats, returned);
2188    let page_counters = combined
2189        .page_counters
2190        .unwrap_or_else(|| NetdataPageCounters {
2191            matched: combined.stats.rows_matched,
2192            before: 0,
2193            after: fallback_rows_after_returned,
2194        });
2195    json!({
2196        "evaluated": combined.stats.rows_examined.saturating_add(unsampled).saturating_add(estimated),
2197        "matched": page_counters.matched.saturating_add(unsampled).saturating_add(estimated),
2198        "unsampled": unsampled,
2199        "estimated": estimated,
2200        "returned": returned,
2201        "max_to_return": request.limit as u64,
2202        "before": page_counters.before,
2203        "after": page_counters.after,
2204    })
2205}
2206
2207fn response_fallback_rows_after_returned(stats: &ExplorerStats, returned: u64) -> u64 {
2208    let source_rows = if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2209        stats.rows_examined
2210    } else {
2211        stats.rows_matched
2212    };
2213    source_rows.saturating_sub(returned)
2214}
2215
2216fn add_last_modified_if_needed(
2217    object: &mut Map<String, Value>,
2218    request: &NetdataRequest,
2219    combined: &CombinedResult,
2220) {
2221    if !request.data_only || request.tail {
2222        object.insert(
2223            "last_modified".to_string(),
2224            Value::from(combined.stats.last_realtime_usec),
2225        );
2226    }
2227}
2228
2229fn add_sampling_if_needed(object: &mut Map<String, Value>, combined: &CombinedResult) {
2230    if combined.sampling_enabled {
2231        object.insert(
2232            "_sampling".to_string(),
2233            json!({
2234                "enabled": true,
2235                "sampled": combined.stats.sampling_sampled,
2236                "unsampled": combined.stats.sampling_unsampled,
2237                "estimated": combined.stats.sampling_estimated,
2238            }),
2239        );
2240    }
2241}
2242
2243fn add_analysis_outputs_if_needed(
2244    object: &mut Map<String, Value>,
2245    request: &NetdataRequest,
2246    artifacts: QueryResponseArtifacts,
2247) {
2248    if !request.data_only || request.delta {
2249        let (facets_key, histogram_key, items_key) = response_analysis_keys(request.data_only);
2250        object.insert(facets_key.to_string(), artifacts.facets);
2251        object.insert(
2252            histogram_key.to_string(),
2253            artifacts.histogram.unwrap_or(Value::Null),
2254        );
2255        object.insert(items_key.to_string(), artifacts.items);
2256    }
2257}
2258
2259fn response_analysis_keys(data_only: bool) -> (&'static str, &'static str, &'static str) {
2260    if data_only {
2261        ("facets_delta", "histogram_delta", "items_delta")
2262    } else {
2263        ("facets", "histogram", "items")
2264    }
2265}
2266
2267fn merge_histogram(
2268    target: &mut Option<ExplorerHistogram>,
2269    source: ExplorerHistogram,
2270) -> Result<()> {
2271    let Some(target) = target else {
2272        *target = Some(source);
2273        return Ok(());
2274    };
2275    if target.field != source.field
2276        || target.buckets.len() != source.buckets.len()
2277        || target
2278            .buckets
2279            .iter()
2280            .zip(source.buckets.iter())
2281            .any(|(target_bucket, source_bucket)| {
2282                target_bucket.start_realtime_usec != source_bucket.start_realtime_usec
2283                    || target_bucket.end_realtime_usec != source_bucket.end_realtime_usec
2284            })
2285    {
2286        return Err(SdkError::Unsupported(
2287            "inconsistent Netdata histogram bucket shape",
2288        ));
2289    }
2290    for (index, source_bucket) in source.buckets.into_iter().enumerate() {
2291        let Some(target_bucket) = target.buckets.get_mut(index) else {
2292            return Err(SdkError::Unsupported(
2293                "inconsistent Netdata histogram bucket shape",
2294            ));
2295        };
2296        for (value, count) in source_bucket.values {
2297            *target_bucket.values.entry(value).or_default() += count;
2298        }
2299    }
2300    Ok(())
2301}
2302
2303fn facet_group_is_reportable(values: &BTreeMap<Vec<u8>, u64>) -> bool {
2304    values
2305        .iter()
2306        .any(|(value, _count)| !value.is_empty() && value.as_slice() != b"-")
2307}
2308
2309fn netdata_facet_value(value: &[u8]) -> &[u8] {
2310    if value.len() > NETDATA_FACET_MAX_VALUE_LENGTH {
2311        &value[..NETDATA_FACET_MAX_VALUE_LENGTH]
2312    } else {
2313        value
2314    }
2315}
2316
2317fn add_netdata_facet_count(target: &mut BTreeMap<Vec<u8>, u64>, value: &[u8], count: u64) {
2318    *target
2319        .entry(netdata_facet_value(value).to_vec())
2320        .or_default() += count;
2321}
2322
2323fn not_modified_before_scan_response(
2324    request: &NetdataRequest,
2325    selected: &SelectedJournalFiles,
2326) -> Option<Value> {
2327    if request.if_modified_since_usec != 0 && !selected.files_are_newer {
2328        Some(netdata_function_error(
2329            304,
2330            "No new data since the previous call.",
2331        ))
2332    } else {
2333        None
2334    }
2335}
2336
2337fn should_collect_unfiltered_facet_vocabulary(
2338    request: &NetdataRequest,
2339    combined: &CombinedResult,
2340) -> bool {
2341    !request.data_only && !combined.partial && !request.filters.is_empty()
2342}
2343
2344fn progress_context(file_index: usize, total_files: usize, started: Instant) -> ProgressContext {
2345    ProgressContext {
2346        current_file: file_index + 1,
2347        total_files,
2348        started,
2349    }
2350}
2351
2352fn emit_netdata_progress(
2353    options: &mut NetdataFunctionRunOptions<'_>,
2354    progress: NetdataFunctionProgress,
2355) {
2356    if let Some(callback) = options.progress_callback.as_deref_mut() {
2357        callback(progress);
2358    }
2359}
2360
2361fn emit_progress_for_combined(
2362    options: &mut NetdataFunctionRunOptions<'_>,
2363    combined: &CombinedResult,
2364    context: ProgressContext,
2365) {
2366    emit_netdata_progress(
2367        options,
2368        NetdataFunctionProgress {
2369            current_file: context.current_file,
2370            total_files: context.total_files,
2371            matched_files: combined.matched_files,
2372            skipped_files: combined.skipped_files,
2373            stats: combined.stats.clone(),
2374            elapsed: context.started.elapsed(),
2375        },
2376    );
2377}
2378
2379fn emit_explorer_progress(
2380    options: &mut NetdataFunctionRunOptions<'_>,
2381    combined: &CombinedResult,
2382    progress: ExplorerProgress,
2383    context: ProgressContext,
2384) {
2385    let stats = merged_progress_stats(&combined.stats, &progress.stats);
2386    if let Some(callback) = options.progress_callback.as_deref_mut() {
2387        callback(NetdataFunctionProgress {
2388            current_file: context.current_file,
2389            total_files: context.total_files,
2390            matched_files: combined.matched_files,
2391            skipped_files: combined.skipped_files,
2392            stats,
2393            elapsed: context.started.elapsed(),
2394        });
2395    }
2396}
2397
2398fn request_cancelled(options: &NetdataFunctionRunOptions<'_>) -> bool {
2399    options
2400        .cancellation_callback
2401        .is_some_and(|is_cancelled| is_cancelled())
2402}
2403
2404fn should_stop_before_file(
2405    combined: &mut CombinedResult,
2406    deadline: Option<Instant>,
2407    options: &NetdataFunctionRunOptions<'_>,
2408) -> bool {
2409    if request_cancelled(options) {
2410        combined.partial = true;
2411        combined.cancelled = true;
2412        return true;
2413    }
2414    if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
2415        combined.partial = true;
2416        combined.timed_out = true;
2417        return true;
2418    }
2419    false
2420}
2421
2422fn collect_column_fields_for_file(
2423    reader: &mut FileReader,
2424    request: &NetdataRequest,
2425    path: &Path,
2426    combined: &mut CombinedResult,
2427) {
2428    if request.data_only {
2429        return;
2430    }
2431    match reader.enumerate_fields_indexed() {
2432        Ok(fields) => combined.add_column_fields(fields),
2433        Err(err) => combined.file_errors.push(format!(
2434            "{}: FIELD index enumeration failed: {err}",
2435            path.display()
2436        )),
2437    }
2438}
2439
2440fn record_explore_result(
2441    result: Result<(ExplorerResult, Option<ExplorerStopReason>)>,
2442    path: &Path,
2443    combined: &mut CombinedResult,
2444) -> Option<(ExplorerResult, Option<ExplorerStopReason>)> {
2445    match result {
2446        Ok(result) => Some(result),
2447        Err(err) => {
2448            combined.skipped_files = combined.skipped_files.saturating_add(1);
2449            combined
2450                .file_errors
2451                .push(format!("{}: {err}", path.display()));
2452            None
2453        }
2454    }
2455}
2456
2457fn delta_scan_can_stop(
2458    request: &NetdataRequest,
2459    page_window: &RefCell<NetdataPageWindow>,
2460    realtime_usec: u64,
2461    rows_matched: u64,
2462    realtime_delta_usec: u64,
2463) -> bool {
2464    let mut page_window = page_window.borrow_mut();
2465    page_window.observe(realtime_usec);
2466    request.data_only
2467        && request.delta
2468        && rows_matched % DATA_ONLY_CHECK_EVERY_ROWS == 0
2469        && page_window.can_stop_delta_file(realtime_usec, realtime_delta_usec)
2470}
2471
2472#[allow(clippy::too_many_arguments)]
2473fn finish_explored_file(
2474    options: &mut NetdataFunctionRunOptions<'_>,
2475    request: &NetdataRequest,
2476    file: &SelectedJournalFile,
2477    query: &ExplorerQuery,
2478    result: ExplorerResult,
2479    stop_reason: Option<ExplorerStopReason>,
2480    combined: &mut CombinedResult,
2481    files: &[SelectedJournalFile],
2482    file_index: usize,
2483    progress: ProgressContext,
2484) -> Result<bool> {
2485    update_learned_realtime_delta(options, &file.path, &file.order, &result.stats);
2486    combined.merge(&file.path, result, query.direction, request.limit)?;
2487    emit_progress_for_combined(options, combined, progress);
2488    if request_cancelled(options) {
2489        combined.partial = true;
2490        combined.cancelled = true;
2491        return Ok(true);
2492    }
2493    if let Some(reason) = stop_reason {
2494        combined.partial = true;
2495        match reason {
2496            ExplorerStopReason::TimedOut => combined.timed_out = true,
2497            ExplorerStopReason::Cancelled => combined.cancelled = true,
2498        }
2499        return Ok(true);
2500    }
2501    Ok(request.data_only
2502        && !request.delta
2503        && !request.tail
2504        && combined.rows.len() >= request.limit
2505        && remaining_files_cannot_affect_data_page(combined, request, files, file_index + 1))
2506}
2507
2508fn file_metadata(
2509    options: &NetdataFunctionRunOptions<'_>,
2510    path: &Path,
2511) -> Option<NetdataJournalFileMetadata> {
2512    options
2513        .state
2514        .as_deref()
2515        .and_then(|state| state.file_metadata(path))
2516}
2517
2518fn update_learned_realtime_delta(
2519    options: &mut NetdataFunctionRunOptions<'_>,
2520    path: &Path,
2521    order: &JournalFileOrderInfo,
2522    stats: &ExplorerStats,
2523) {
2524    let learned_delta = stats.max_source_realtime_delta_usec;
2525    if learned_delta == 0 || learned_delta <= order.journal_vs_realtime_delta_usec {
2526        return;
2527    }
2528    let learned_delta = normalize_journal_vs_realtime_delta_usec(learned_delta);
2529    if learned_delta <= order.journal_vs_realtime_delta_usec {
2530        return;
2531    }
2532    if let Some(state) = options.state.as_deref_mut() {
2533        state.update_file_journal_vs_realtime_delta_usec(path, learned_delta);
2534    }
2535}
2536
2537fn normalize_journal_vs_realtime_delta_usec(delta_usec: u64) -> u64 {
2538    delta_usec
2539        .max(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC)
2540        .min(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC)
2541}
2542
2543#[cfg(test)]
2544fn file_may_overlap_request(header: crate::FileHeader, request: &NetdataRequest) -> bool {
2545    if header.tail_entry_realtime == 0 {
2546        return true;
2547    }
2548
2549    let first = header
2550        .head_entry_realtime
2551        .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2552    let last = header
2553        .tail_entry_realtime
2554        .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2555
2556    if request
2557        .after_realtime_usec
2558        .is_some_and(|after| last < after)
2559    {
2560        return false;
2561    }
2562    if request
2563        .before_realtime_usec
2564        .is_some_and(|before| first > before)
2565    {
2566        return false;
2567    }
2568
2569    true
2570}
2571
2572#[derive(Debug)]
2573struct SelectedJournalFile {
2574    path: PathBuf,
2575    order: JournalFileOrderInfo,
2576}
2577
2578#[derive(Debug, Default)]
2579struct SelectedJournalFiles {
2580    files: Vec<SelectedJournalFile>,
2581    files_are_newer: bool,
2582}
2583
2584fn select_journal_files_for_request(
2585    paths: Vec<PathBuf>,
2586    request: &NetdataRequest,
2587    reader_options: ReaderOptions,
2588    options: &NetdataFunctionRunOptions<'_>,
2589) -> SelectedJournalFiles {
2590    let mut selected = Vec::new();
2591    for path in paths {
2592        let metadata = file_metadata(options, &path);
2593        if !request.matches_source(&path, metadata.as_ref()) {
2594            continue;
2595        }
2596        let order = journal_file_order_info(&path, reader_options, metadata.as_ref());
2597        if !journal_file_order_may_overlap_request(&order, request) {
2598            continue;
2599        }
2600        selected.push(SelectedJournalFile { path, order });
2601    }
2602    selected.sort_by(|left, right| {
2603        compare_journal_file_order(&left.order, &right.order, request.direction)
2604            .then_with(|| left.path.cmp(&right.path))
2605    });
2606    let files_are_newer = selected
2607        .iter()
2608        .any(|file| file.order.msg_last_realtime_usec > request.if_modified_since_usec);
2609    SelectedJournalFiles {
2610        files: selected,
2611        files_are_newer,
2612    }
2613}
2614
2615fn remaining_files_cannot_affect_data_page(
2616    combined: &CombinedResult,
2617    request: &NetdataRequest,
2618    files: &[SelectedJournalFile],
2619    next_file_index: usize,
2620) -> bool {
2621    let Some(next) = files.get(next_file_index) else {
2622        return true;
2623    };
2624    let slack = next.order.journal_vs_realtime_delta_usec;
2625    match request.direction {
2626        Direction::Backward => {
2627            let Some(oldest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).min()
2628            else {
2629                return false;
2630            };
2631            next.order.msg_last_realtime_usec < oldest_retained.saturating_sub(slack)
2632        }
2633        Direction::Forward => {
2634            let Some(newest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).max()
2635            else {
2636                return false;
2637            };
2638            next.order.msg_first_realtime_usec > newest_retained.saturating_add(slack)
2639        }
2640    }
2641}
2642
2643fn journal_file_order_may_overlap_request(
2644    info: &JournalFileOrderInfo,
2645    request: &NetdataRequest,
2646) -> bool {
2647    if info.msg_last_realtime_usec == 0 {
2648        return true;
2649    }
2650
2651    let first = info
2652        .msg_first_realtime_usec
2653        .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2654    let last = info
2655        .msg_last_realtime_usec
2656        .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2657
2658    if request
2659        .after_realtime_usec
2660        .is_some_and(|after| last < after)
2661    {
2662        return false;
2663    }
2664    if request
2665        .before_realtime_usec
2666        .is_some_and(|before| first > before)
2667    {
2668        return false;
2669    }
2670
2671    true
2672}
2673
2674fn collect_boot_first_realtime(
2675    paths: &[PathBuf],
2676    reader_options: ReaderOptions,
2677    needed_boot_ids: &BTreeSet<Vec<u8>>,
2678) -> BTreeMap<Vec<u8>, u64> {
2679    let mut out = BTreeMap::new();
2680    if needed_boot_ids.is_empty() {
2681        return out;
2682    }
2683    for path in paths {
2684        let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2685            continue;
2686        };
2687        let Ok(boot_ids) = reader.query_unique("_BOOT_ID") else {
2688            continue;
2689        };
2690        for boot_id in boot_ids {
2691            if !needed_boot_ids.contains(&boot_id) {
2692                continue;
2693            }
2694            let mut match_payload = b"_BOOT_ID=".to_vec();
2695            match_payload.extend_from_slice(&boot_id);
2696            reader.flush_matches();
2697            reader.add_match(&match_payload);
2698            reader.seek_head();
2699            if !reader.next().unwrap_or(false) {
2700                continue;
2701            }
2702            if let Ok(realtime) = reader.get_realtime_usec() {
2703                record_boot_first_realtime(&mut out, boot_id, realtime);
2704            }
2705        }
2706        reader.flush_matches();
2707    }
2708    out
2709}
2710
2711fn response_boot_ids(
2712    column_order: &[String],
2713    rows: &[LocatedRow],
2714    facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
2715    histogram: Option<&ExplorerHistogram>,
2716) -> BTreeSet<Vec<u8>> {
2717    let mut boot_ids = BTreeSet::new();
2718    let row_needs_boot_id = column_order.iter().any(|field| field == "_BOOT_ID");
2719    if row_needs_boot_id {
2720        for row in rows {
2721            if let Some(values) = row_fields(row).get("_BOOT_ID") {
2722                boot_ids.extend(values.iter().cloned());
2723            }
2724        }
2725    }
2726    if let Some(values) = facets.get(b"_BOOT_ID".as_slice()) {
2727        boot_ids.extend(
2728            values
2729                .keys()
2730                .filter(|value| !value.is_empty() && value.as_slice() != b"-")
2731                .cloned(),
2732        );
2733    }
2734    if let Some(histogram) = histogram.filter(|histogram| histogram.field == b"_BOOT_ID") {
2735        for bucket in &histogram.buckets {
2736            boot_ids.extend(
2737                bucket
2738                    .values
2739                    .keys()
2740                    .filter(|value| !value.is_empty() && value.as_slice() != b"-")
2741                    .cloned(),
2742            );
2743        }
2744    }
2745    boot_ids
2746}
2747
2748fn record_boot_first_realtime(
2749    target: &mut BTreeMap<Vec<u8>, u64>,
2750    boot_id: Vec<u8>,
2751    realtime_usec: u64,
2752) {
2753    let existing = target.entry(boot_id).or_insert(realtime_usec);
2754    if realtime_usec < *existing {
2755        *existing = realtime_usec;
2756    }
2757}
2758
2759fn row_fields(row: &LocatedRow) -> BTreeMap<String, Vec<Vec<u8>>> {
2760    let mut fields = BTreeMap::new();
2761    for payload in &row.row.payloads {
2762        let Some((field, value)) = split_payload(payload) else {
2763            continue;
2764        };
2765        fields
2766            .entry(String::from_utf8_lossy(field).into_owned())
2767            .or_insert_with(Vec::new)
2768            .push(value.to_vec());
2769    }
2770    fields.insert(
2771        "ND_JOURNAL_FILE".to_string(),
2772        vec![row.file_path.display().to_string().into_bytes()],
2773    );
2774    if !fields.contains_key("ND_JOURNAL_PROCESS") {
2775        let process = dynamic_process_name(&fields);
2776        if !process.is_empty() {
2777            fields.insert("ND_JOURNAL_PROCESS".to_string(), vec![process.into_bytes()]);
2778        }
2779    }
2780    fields
2781}
2782
2783fn dynamic_process_name(fields: &BTreeMap<String, Vec<Vec<u8>>>) -> String {
2784    let base = first_value(fields, "CONTAINER_NAME")
2785        .or_else(|| first_value(fields, "SYSLOG_IDENTIFIER"))
2786        .or_else(|| first_value(fields, "_COMM"))
2787        .map(|value| String::from_utf8_lossy(value).into_owned())
2788        .unwrap_or_default();
2789    if base.is_empty() {
2790        return "-".to_string();
2791    }
2792    let pid = first_value(fields, "_PID").map(|value| String::from_utf8_lossy(value).into_owned());
2793    match pid {
2794        Some(pid) if !pid.is_empty() => format!("{base}[{pid}]"),
2795        _ => base,
2796    }
2797}
2798
2799fn make_row_timestamps_unique(rows: &mut [LocatedRow], direction: Direction) {
2800    let mut last_from = 0u64;
2801    let mut last_to = 0u64;
2802    let mut initialized = false;
2803    for row in rows {
2804        let timestamp = row.row.realtime_usec;
2805        if initialized && timestamp >= last_from && timestamp <= last_to {
2806            match direction {
2807                Direction::Backward => {
2808                    last_from = last_from.saturating_sub(1);
2809                    row.row.realtime_usec = last_from;
2810                }
2811                Direction::Forward => {
2812                    last_to = last_to.saturating_add(1);
2813                    row.row.realtime_usec = last_to;
2814                }
2815            }
2816        } else {
2817            last_from = timestamp;
2818            last_to = timestamp;
2819            initialized = true;
2820        }
2821    }
2822}
2823
2824fn first_value<'a>(fields: &'a BTreeMap<String, Vec<Vec<u8>>>, field: &str) -> Option<&'a [u8]> {
2825    fields
2826        .get(field)
2827        .and_then(|values| values.first())
2828        .map(Vec::as_slice)
2829}
2830
2831fn split_payload(payload: &[u8]) -> Option<(&[u8], &[u8])> {
2832    let split = payload.iter().position(|byte| *byte == b'=')?;
2833    Some((&payload[..split], &payload[split + 1..]))
2834}
2835
2836fn column_metadata(key: &str, index: usize) -> Value {
2837    let (visible, filter, full_width) = match key {
2838        "timestamp" => (true, "range", false),
2839        "rowOptions" => (false, "none", false),
2840        "_HOSTNAME" => (true, "facet", false),
2841        "ND_JOURNAL_PROCESS" | "MESSAGE" => (true, "none", key == "MESSAGE"),
2842        "ND_JOURNAL_FILE" | "_SOURCE_REALTIME_TIMESTAMP" => (false, "none", false),
2843        _ if systemd_column_is_facet(key) => (false, "facet", false),
2844        _ => (false, "none", false),
2845    };
2846    let column_type = if key == "timestamp" {
2847        "timestamp"
2848    } else if key == "rowOptions" {
2849        "none"
2850    } else {
2851        "string"
2852    };
2853    let visualization = if key == "rowOptions" {
2854        "rowOptions"
2855    } else {
2856        "value"
2857    };
2858    let mut metadata = json!({
2859        "index": index,
2860        "unique_key": key == "timestamp",
2861        "name": if key == "timestamp" { "Timestamp" } else { key },
2862        "visible": visible,
2863        "type": column_type,
2864        "visualization": visualization,
2865        "value_options": {
2866            "transform": if key == "timestamp" { "datetime_usec" } else { "none" },
2867            "decimal_points": 0,
2868            "default_value": if key == "timestamp" || key == "rowOptions" {
2869                Value::Null
2870            } else {
2871                Value::String("-".to_string())
2872            },
2873        },
2874        "sort": "ascending",
2875        "sortable": false,
2876        "sticky": false,
2877        "summary": "count",
2878        "filter": filter,
2879        "full_width": full_width,
2880        "wrap": key != "rowOptions",
2881        "default_expanded_filter": matches!(key, "PRIORITY" | "SYSLOG_FACILITY" | "MESSAGE_ID"),
2882    });
2883    if key == "rowOptions" {
2884        if let Some(object) = metadata.as_object_mut() {
2885            object.insert("dummy".to_string(), Value::Bool(true));
2886        }
2887    }
2888    metadata
2889}
2890
2891fn systemd_column_is_facet(key: &str) -> bool {
2892    if key == "MESSAGE_ID" {
2893        return true;
2894    }
2895    if key.contains("MESSAGE") || key.contains("TIMESTAMP") || key.starts_with("__") {
2896        return false;
2897    }
2898    true
2899}
2900
2901fn sort_facet_options(field: &str, options: &mut [Value]) {
2902    options.sort_by(|left, right| {
2903        let left_id = left.get("id").and_then(Value::as_str).unwrap_or_default();
2904        let right_id = right.get("id").and_then(Value::as_str).unwrap_or_default();
2905        if field == "PRIORITY" {
2906            return parse_priority(left_id).cmp(&parse_priority(right_id));
2907        }
2908        let left_count = left
2909            .get("count")
2910            .and_then(Value::as_u64)
2911            .unwrap_or_default();
2912        let right_count = right
2913            .get("count")
2914            .and_then(Value::as_u64)
2915            .unwrap_or_default();
2916        right_count
2917            .cmp(&left_count)
2918            .then_with(|| left_id.cmp(right_id))
2919    });
2920}
2921
2922fn parse_fts_query_patterns(query: &str) -> (Vec<ExplorerFtsPattern>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
2923    let bytes = query.as_bytes();
2924    let mut index = 0usize;
2925    let mut terms = Vec::new();
2926    let mut positives = Vec::new();
2927    let mut negatives = Vec::new();
2928
2929    while let Some((pattern, negative)) = next_fts_pattern(bytes, &mut index) {
2930        push_fts_pattern(
2931            pattern,
2932            negative,
2933            &mut terms,
2934            &mut positives,
2935            &mut negatives,
2936        );
2937    }
2938
2939    (terms, positives, negatives)
2940}
2941
2942fn next_fts_pattern(bytes: &[u8], index: &mut usize) -> Option<(Vec<u8>, bool)> {
2943    while *index < bytes.len() {
2944        skip_fts_separators(bytes, index);
2945        let negative = consume_fts_negative_marker(bytes, index);
2946        let pattern = read_fts_pattern(bytes, index);
2947        if !pattern.is_empty() {
2948            return Some((pattern, negative));
2949        }
2950    }
2951    None
2952}
2953
2954fn skip_fts_separators(bytes: &[u8], index: &mut usize) {
2955    while *index < bytes.len() && bytes[*index] == b'|' {
2956        *index += 1;
2957    }
2958}
2959
2960fn consume_fts_negative_marker(bytes: &[u8], index: &mut usize) -> bool {
2961    if bytes.get(*index) == Some(&b'!') {
2962        *index += 1;
2963        true
2964    } else {
2965        false
2966    }
2967}
2968
2969fn read_fts_pattern(bytes: &[u8], index: &mut usize) -> Vec<u8> {
2970    let mut pattern = Vec::new();
2971    let mut escaped = false;
2972    while *index < bytes.len() {
2973        let byte = bytes[*index];
2974        *index += 1;
2975        if byte == b'\\' && !escaped {
2976            escaped = true;
2977            continue;
2978        }
2979        if byte == b'|' && !escaped {
2980            break;
2981        }
2982        pattern.push(byte);
2983        escaped = false;
2984    }
2985    pattern
2986}
2987
2988fn push_fts_pattern(
2989    pattern: Vec<u8>,
2990    negative: bool,
2991    terms: &mut Vec<ExplorerFtsPattern>,
2992    positives: &mut Vec<Vec<u8>>,
2993    negatives: &mut Vec<Vec<u8>>,
2994) {
2995    terms.push(ExplorerFtsPattern::substring(pattern.clone(), negative));
2996    if negative {
2997        negatives.push(pattern);
2998    } else {
2999        positives.push(pattern);
3000    }
3001}
3002
3003fn parse_filters(value: Option<&Value>) -> Vec<ExplorerFilter> {
3004    let Some(Value::Object(selections)) = value else {
3005        return Vec::new();
3006    };
3007    let mut filters = Vec::new();
3008    for (field, values) in selections {
3009        if matches!(field.as_str(), "query" | "source" | "__logs_sources") {
3010            continue;
3011        }
3012        let Some(values) = parse_string_array(Some(values)) else {
3013            continue;
3014        };
3015        filters.push(ExplorerFilter::new(
3016            field.as_bytes().to_vec(),
3017            values
3018                .into_iter()
3019                .map(|value| normalize_filter_value(field, &value)),
3020        ));
3021    }
3022    filters
3023}
3024
3025#[derive(Debug, Clone)]
3026struct SourceSelection {
3027    source_type: u64,
3028    exact_sources: Vec<String>,
3029}
3030
3031fn parse_source_selection(value: Option<&Value>) -> SourceSelection {
3032    let mut selection = SourceSelection {
3033        source_type: SOURCE_TYPE_ALL,
3034        exact_sources: Vec::new(),
3035    };
3036    let Some(Value::Object(selections)) = value else {
3037        return selection;
3038    };
3039    let Some(values) = parse_string_array(selections.get("__logs_sources")) else {
3040        return selection;
3041    };
3042    selection.source_type = 0;
3043    for value in values {
3044        match source_type_for_name(&value) {
3045            Some(source_type) => selection.source_type |= source_type,
3046            None => selection.exact_sources.push(value),
3047        }
3048    }
3049    selection
3050}
3051
3052fn source_type_for_name(value: &str) -> Option<u64> {
3053    match value {
3054        "all" => Some(SOURCE_TYPE_ALL),
3055        "all-local-logs" => Some(SOURCE_TYPE_LOCAL_ALL),
3056        "all-remote-systems" => Some(SOURCE_TYPE_REMOTE_ALL),
3057        "all-local-system-logs" => Some(SOURCE_TYPE_LOCAL_SYSTEM),
3058        "all-local-user-logs" => Some(SOURCE_TYPE_LOCAL_USER),
3059        "all-local-namespaces" => Some(SOURCE_TYPE_LOCAL_NAMESPACE),
3060        "all-uncategorized" => Some(SOURCE_TYPE_LOCAL_OTHER),
3061        _ => None,
3062    }
3063}
3064
3065fn journal_file_source_type(path: &Path) -> u64 {
3066    let text = path.to_string_lossy();
3067    let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
3068        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER;
3069    };
3070    if text.contains("/remote/") {
3071        return SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL;
3072    }
3073    if local_namespace_source_name(path).is_some() {
3074        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE;
3075    }
3076    if name.starts_with("system") {
3077        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM;
3078    }
3079    if name.starts_with("user") {
3080        return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER;
3081    }
3082    SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
3083}
3084
3085fn local_namespace_source_name(path: &Path) -> Option<String> {
3086    let parent = path.parent()?.file_name()?.to_str()?;
3087    let (_, namespace) = parent.rsplit_once('.')?;
3088    (!namespace.is_empty()).then(|| format!("namespace-{namespace}"))
3089}
3090
3091fn journal_file_exact_source_name(path: &Path) -> Option<String> {
3092    let text = path.to_string_lossy();
3093    if text.contains("/remote/") {
3094        let name = path.file_name()?.to_str()?;
3095        let source = name
3096            .split_once('@')
3097            .map(|(prefix, _)| prefix)
3098            .unwrap_or_else(|| {
3099                name.strip_suffix(".journal~.zst")
3100                    .or_else(|| name.strip_suffix(".journal.zst"))
3101                    .or_else(|| name.strip_suffix(".journal~"))
3102                    .or_else(|| name.strip_suffix(".journal"))
3103                    .unwrap_or(name)
3104            });
3105        return source.starts_with("remote-").then(|| source.to_string());
3106    }
3107    local_namespace_source_name(path)
3108}
3109
3110fn normalize_filter_value(field: &str, value: &str) -> Vec<u8> {
3111    if field == "PRIORITY" {
3112        if let Some(priority) = priority_name_to_number(value) {
3113            return priority.as_bytes().to_vec();
3114        }
3115    }
3116    value.as_bytes().to_vec()
3117}
3118
3119fn parse_string_array(value: Option<&Value>) -> Option<Vec<String>> {
3120    let Value::Array(items) = value? else {
3121        return None;
3122    };
3123    Some(
3124        items
3125            .iter()
3126            .filter_map(Value::as_str)
3127            .map(ToOwned::to_owned)
3128            .collect(),
3129    )
3130}
3131
3132fn request_direction(object: &Map<String, Value>) -> Direction {
3133    match get_str(object, "direction").unwrap_or("backward") {
3134        "forward" | "forwards" | "next" => Direction::Forward,
3135        _ => Direction::Backward,
3136    }
3137}
3138
3139fn request_delta(data_only: bool, object: &Map<String, Value>) -> bool {
3140    data_only && get_bool(object, "delta").unwrap_or(false)
3141}
3142
3143fn request_tail(data_only: bool, if_modified_since_usec: u64, object: &Map<String, Value>) -> bool {
3144    data_only && if_modified_since_usec != 0 && get_bool(object, "tail").unwrap_or(false)
3145}
3146
3147fn request_anchor_and_direction(
3148    object: &Map<String, Value>,
3149    tail: bool,
3150    direction: Direction,
3151    after_realtime_usec: Option<u64>,
3152    before_realtime_usec: Option<u64>,
3153) -> (ExplorerAnchor, Direction) {
3154    let anchor = get_u64(object, "anchor")
3155        .map(normalize_timestamp_to_usec)
3156        .map(ExplorerAnchor::Realtime)
3157        .unwrap_or(ExplorerAnchor::Auto);
3158    if tail && matches!(anchor, ExplorerAnchor::Realtime(_)) {
3159        return (anchor, Direction::Backward);
3160    }
3161    if anchor_outside_window(anchor, after_realtime_usec, before_realtime_usec) {
3162        (ExplorerAnchor::Auto, Direction::Backward)
3163    } else {
3164        (anchor, direction)
3165    }
3166}
3167
3168fn anchor_outside_window(
3169    anchor: ExplorerAnchor,
3170    after_realtime_usec: Option<u64>,
3171    before_realtime_usec: Option<u64>,
3172) -> bool {
3173    let ExplorerAnchor::Realtime(anchor_usec) = anchor else {
3174        return false;
3175    };
3176    after_realtime_usec.is_some_and(|after| anchor_usec < after)
3177        || before_realtime_usec.is_some_and(|before| anchor_usec > before)
3178}
3179
3180fn request_limit(object: &Map<String, Value>) -> usize {
3181    get_u64(object, "last")
3182        .filter(|value| *value != 0)
3183        .map(|value| value as usize)
3184        .unwrap_or(DEFAULT_ITEMS_TO_RETURN)
3185}
3186
3187fn request_facets(
3188    requested_facets: &Option<Vec<String>>,
3189    config: &NetdataFunctionConfig,
3190) -> Vec<Vec<u8>> {
3191    requested_facets
3192        .clone()
3193        .unwrap_or_else(|| config.default_facets.clone())
3194        .into_iter()
3195        .map(Vec::from)
3196        .collect()
3197}
3198
3199fn request_histogram(object: &Map<String, Value>) -> Option<String> {
3200    get_str(object, "histogram")
3201        .filter(|histogram| !histogram.is_empty())
3202        .map(ToOwned::to_owned)
3203}
3204
3205fn request_histogram_or_default(
3206    requested_histogram: &Option<String>,
3207    config: &NetdataFunctionConfig,
3208) -> Option<String> {
3209    requested_histogram
3210        .clone()
3211        .or_else(|| config.default_histogram.clone())
3212}
3213
3214fn request_query(object: &Map<String, Value>) -> Option<String> {
3215    get_str(object, "query")
3216        .filter(|query| !query.is_empty())
3217        .map(ToOwned::to_owned)
3218}
3219
3220fn get_bool(object: &Map<String, Value>, key: &str) -> Option<bool> {
3221    object.get(key).and_then(Value::as_bool)
3222}
3223
3224fn get_i64(object: &Map<String, Value>, key: &str) -> Option<i64> {
3225    object.get(key).and_then(Value::as_i64)
3226}
3227
3228fn get_u64(object: &Map<String, Value>, key: &str) -> Option<u64> {
3229    object.get(key).and_then(Value::as_u64)
3230}
3231
3232fn get_str<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
3233    object.get(key).and_then(Value::as_str)
3234}
3235
3236fn normalize_time_window(
3237    now_seconds: i64,
3238    after: Option<i64>,
3239    before: Option<i64>,
3240) -> (Option<u64>, Option<u64>) {
3241    let mut after = after.unwrap_or(0);
3242    let mut before = before.unwrap_or(0);
3243
3244    if after == 0 && before == 0 {
3245        before = now_seconds;
3246        after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3247    } else {
3248        (after, before) = relative_window_to_absolute(now_seconds, after, before);
3249    }
3250
3251    if after > before {
3252        std::mem::swap(&mut after, &mut before);
3253    }
3254    if after == before {
3255        after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3256    }
3257
3258    (
3259        Some(normalize_timestamp_to_usec_with_rounding(
3260            after.max(0) as u64,
3261            false,
3262        )),
3263        Some(normalize_timestamp_to_usec_with_rounding(
3264            before.max(0) as u64,
3265            true,
3266        )),
3267    )
3268}
3269
3270fn relative_window_to_absolute(now_seconds: i64, after: i64, before: i64) -> (i64, i64) {
3271    let mut after = after;
3272    let mut before = before;
3273
3274    if before.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3275        if before > 0 {
3276            before = -before;
3277        }
3278        before = now_seconds.saturating_add(before);
3279    }
3280
3281    if after.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3282        if after > 0 {
3283            after = -after;
3284        }
3285        if after == 0 {
3286            after = -NETDATA_MISSING_AFTER_RELATIVE_SECONDS;
3287        }
3288        after = before.saturating_add(after).saturating_add(1);
3289    }
3290
3291    if after > before {
3292        std::mem::swap(&mut after, &mut before);
3293    }
3294
3295    if before > now_seconds {
3296        let delta = before.saturating_sub(now_seconds);
3297        before = before.saturating_sub(delta);
3298        after = after.saturating_sub(delta);
3299    }
3300
3301    (after, before)
3302}
3303
3304struct RequestEchoInput<'a> {
3305    info: bool,
3306    after_realtime_usec: Option<u64>,
3307    before_realtime_usec: Option<u64>,
3308    if_modified_since_usec: u64,
3309    anchor: ExplorerAnchor,
3310    direction: Direction,
3311    limit: usize,
3312    data_only: bool,
3313    delta: bool,
3314    tail: bool,
3315    sampling: u64,
3316    source_type: u64,
3317    requested_facets: Option<&'a [String]>,
3318    selections: Option<&'a Value>,
3319    histogram: Option<&'a str>,
3320    query: Option<&'a str>,
3321}
3322
3323fn normalized_request_echo(input: &RequestEchoInput<'_>) -> Value {
3324    let anchor_usec = match input.anchor {
3325        ExplorerAnchor::Realtime(usec) => usec,
3326        ExplorerAnchor::Auto | ExplorerAnchor::Head | ExplorerAnchor::Tail => 0,
3327    };
3328    let mut out = json!({
3329        "info": input.info,
3330        // The SDK Netdata boundary always uses indexed slice semantics. The
3331        // field remains in the echo because it is part of the plugin request
3332        // shape and downstream fixtures compare normalized requests.
3333        "slice": true,
3334        "data_only": input.data_only,
3335        "delta": input.delta,
3336        "tail": input.tail,
3337        "sampling": input.sampling,
3338        "source_type": input.source_type,
3339        "after": input.after_realtime_usec.unwrap_or(0) / 1_000_000,
3340        "before": input.before_realtime_usec.unwrap_or(0) / 1_000_000,
3341        "if_modified_since": input.if_modified_since_usec,
3342        "anchor": anchor_usec,
3343        "direction": match input.direction {
3344            Direction::Forward => "forward",
3345            Direction::Backward => "backward",
3346        },
3347        "last": input.limit,
3348        "query": input.query,
3349        "histogram": input.histogram,
3350    });
3351    if let Some(facets) = input.requested_facets {
3352        if let Some(object) = out.as_object_mut() {
3353            object.insert(
3354                "facets".to_string(),
3355                facets
3356                    .iter()
3357                    .map(|field| Value::String(field.clone()))
3358                    .collect(),
3359            );
3360        }
3361    }
3362    if let Some(Value::Object(selections)) = input.selections {
3363        let mut selections = selections.clone();
3364        if let Some(Value::Array(sources)) = selections.get_mut("__logs_sources") {
3365            for source in sources {
3366                *source = Value::Null;
3367            }
3368        }
3369        if let Some(object) = out.as_object_mut() {
3370            object.insert("selections".to_string(), Value::Object(selections));
3371        }
3372    }
3373    out
3374}
3375
3376fn normalize_timestamp_to_usec(value: u64) -> u64 {
3377    normalize_timestamp_to_usec_with_rounding(value, false)
3378}
3379
3380fn normalize_timestamp_to_usec_with_rounding(value: u64, end_of_second: bool) -> u64 {
3381    if value >= 1_000_000_000_000 {
3382        value
3383    } else if end_of_second {
3384        value.saturating_mul(1_000_000).saturating_add(999_999)
3385    } else {
3386        value.saturating_mul(1_000_000)
3387    }
3388}
3389
3390fn unix_now_seconds() -> i64 {
3391    SystemTime::now()
3392        .duration_since(UNIX_EPOCH)
3393        .map(|duration| duration.as_secs() as i64)
3394        .unwrap_or_default()
3395}
3396
3397fn human_binary_size(bytes: u64) -> String {
3398    const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
3399    let mut value = bytes as f64;
3400    let mut unit = 0usize;
3401    while value >= 1024.0 && unit + 1 < UNITS.len() {
3402        value /= 1024.0;
3403        unit += 1;
3404    }
3405    if unit == 0 {
3406        format!("{}{}", bytes, UNITS[unit])
3407    } else if value.fract() == 0.0 {
3408        format!("{value:.0}{}", UNITS[unit])
3409    } else {
3410        let mut formatted = format!("{value:.2}");
3411        while formatted.contains('.') && formatted.ends_with('0') {
3412            formatted.pop();
3413        }
3414        if formatted.ends_with('.') {
3415            formatted.pop();
3416        }
3417        format!("{formatted}{}", UNITS[unit])
3418    }
3419}
3420
3421fn human_duration_seconds(seconds: u64) -> String {
3422    let years = seconds / (365 * 86_400);
3423    let seconds = seconds % (365 * 86_400);
3424    let months = seconds / (30 * 86_400);
3425    let seconds = seconds % (30 * 86_400);
3426    let days = seconds / 86_400;
3427    let seconds = seconds % 86_400;
3428    let hours = seconds / 3600;
3429    let minutes = (seconds % 3600) / 60;
3430    let seconds = seconds % 60;
3431    let mut parts = Vec::new();
3432    if years != 0 {
3433        parts.push(format!("{years}y"));
3434    }
3435    if months != 0 {
3436        parts.push(format!("{months}mo"));
3437    }
3438    if days != 0 {
3439        parts.push(format!("{days}d"));
3440    }
3441    if hours != 0 {
3442        parts.push(format!("{hours}h"));
3443    }
3444    if minutes != 0 {
3445        parts.push(format!("{minutes}m"));
3446    }
3447    if seconds != 0 || parts.is_empty() {
3448        parts.push(format!("{seconds}s"));
3449    }
3450    parts.join(" ")
3451}
3452
3453#[derive(Debug, Default)]
3454struct JournalFileCollection {
3455    files: Vec<PathBuf>,
3456    skipped: u64,
3457    errors: Vec<String>,
3458}
3459
3460fn collect_journal_files(path: &Path) -> Result<JournalFileCollection> {
3461    if !path.is_dir() {
3462        return Err(SdkError::InvalidPath(format!(
3463            "not a directory: {}",
3464            path.display()
3465        )));
3466    }
3467    let mut collection = JournalFileCollection::default();
3468    let mut pending = VecDeque::from([(path.to_path_buf(), 0usize)]);
3469    let mut visited = HashSet::new();
3470    while let Some((directory, depth)) = pending.pop_front() {
3471        let visited_key = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
3472        if visited.contains(&visited_key) {
3473            continue;
3474        }
3475        if visited.len() >= NETDATA_MAX_DIRECTORY_SCAN_COUNT {
3476            collection.skipped = collection.skipped.saturating_add(1);
3477            collection.errors.push(format!(
3478                "{}: directory scan limit reached",
3479                directory.display()
3480            ));
3481            continue;
3482        }
3483        visited.insert(visited_key);
3484        let entries = match std::fs::read_dir(&directory) {
3485            Ok(entries) => entries,
3486            Err(err) if directory == path => return Err(err.into()),
3487            Err(err) => {
3488                collection.skipped = collection.skipped.saturating_add(1);
3489                collection
3490                    .errors
3491                    .push(format!("{}: {err}", directory.display()));
3492                continue;
3493            }
3494        };
3495        for entry in entries.flatten() {
3496            let entry_path = entry.path();
3497            if entry_path.is_file() && is_journal_file_name(&entry_path) {
3498                collection.files.push(entry_path);
3499            } else if depth < NETDATA_MAX_DIRECTORY_SCAN_DEPTH && entry_path.is_dir() {
3500                pending.push_back((entry_path, depth + 1));
3501            }
3502        }
3503    }
3504    collection.files.sort();
3505    dedup_journal_files_by_canonical_path(&mut collection.files);
3506    Ok(collection)
3507}
3508
3509fn dedup_journal_files_by_canonical_path(files: &mut Vec<PathBuf>) {
3510    let mut seen = HashSet::new();
3511    files.retain(|path| {
3512        let key = std::fs::canonicalize(path).unwrap_or_else(|_| path.clone());
3513        seen.insert(key)
3514    });
3515}
3516
3517#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3518struct JournalFileOrderInfo {
3519    msg_first_realtime_usec: u64,
3520    msg_last_realtime_usec: u64,
3521    file_last_modified_usec: u64,
3522    journal_vs_realtime_delta_usec: u64,
3523}
3524
3525fn journal_file_order_info(
3526    path: &Path,
3527    reader_options: ReaderOptions,
3528    metadata: Option<&NetdataJournalFileMetadata>,
3529) -> JournalFileOrderInfo {
3530    let file_last_modified_usec = std::fs::metadata(path)
3531        .ok()
3532        .and_then(|metadata| metadata.modified().ok())
3533        .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
3534        .map(|duration| duration.as_micros().min(u128::from(u64::MAX)) as u64)
3535        .unwrap_or_default();
3536    let file_last_modified_usec = metadata
3537        .and_then(|metadata| metadata.file_last_modified_usec)
3538        .unwrap_or(file_last_modified_usec);
3539    let journal_vs_realtime_delta_usec = metadata
3540        .and_then(|metadata| metadata.journal_vs_realtime_delta_usec)
3541        .map(normalize_journal_vs_realtime_delta_usec)
3542        .unwrap_or(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
3543
3544    let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
3545        return JournalFileOrderInfo {
3546            msg_first_realtime_usec: 0,
3547            msg_last_realtime_usec: file_last_modified_usec,
3548            file_last_modified_usec,
3549            journal_vs_realtime_delta_usec,
3550        };
3551    };
3552    let header = reader.header();
3553    let msg_first_realtime_usec = metadata
3554        .and_then(|metadata| metadata.msg_first_realtime_usec)
3555        .unwrap_or(header.head_entry_realtime);
3556    let msg_last_realtime_usec = metadata
3557        .and_then(|metadata| metadata.msg_last_realtime_usec)
3558        .unwrap_or_else(|| {
3559            if header.tail_entry_realtime == 0 {
3560                file_last_modified_usec
3561            } else {
3562                header.tail_entry_realtime
3563            }
3564        });
3565    JournalFileOrderInfo {
3566        msg_first_realtime_usec,
3567        msg_last_realtime_usec,
3568        file_last_modified_usec,
3569        journal_vs_realtime_delta_usec,
3570    }
3571}
3572
3573fn compare_journal_file_order(
3574    left: &JournalFileOrderInfo,
3575    right: &JournalFileOrderInfo,
3576    direction: Direction,
3577) -> Ordering {
3578    let backward = right
3579        .msg_last_realtime_usec
3580        .cmp(&left.msg_last_realtime_usec)
3581        .then_with(|| {
3582            right
3583                .file_last_modified_usec
3584                .cmp(&left.file_last_modified_usec)
3585        })
3586        .then_with(|| {
3587            right
3588                .msg_first_realtime_usec
3589                .cmp(&left.msg_first_realtime_usec)
3590        });
3591    match direction {
3592        Direction::Backward => backward,
3593        Direction::Forward => backward.reverse(),
3594    }
3595}
3596
3597fn is_journal_file_name(path: &Path) -> bool {
3598    path.file_name()
3599        .and_then(|name| name.to_str())
3600        .is_some_and(|name| {
3601            name.ends_with(".journal")
3602                || name.ends_with(".journal~")
3603                || name.ends_with(".journal.zst")
3604                || name.ends_with(".journal~.zst")
3605        })
3606}
3607
3608fn push_unique_many(target: &mut Vec<String>, values: &[String]) {
3609    for value in values {
3610        push_unique(target, value);
3611    }
3612}
3613
3614fn string_fields(fields: &[Vec<u8>]) -> Vec<String> {
3615    fields
3616        .iter()
3617        .filter_map(|field| String::from_utf8(field.clone()).ok())
3618        .collect()
3619}
3620
3621fn push_unique(target: &mut Vec<String>, value: impl AsRef<str>) {
3622    let value = value.as_ref();
3623    if !target.iter().any(|existing| existing == value) {
3624        target.push(value.to_string());
3625    }
3626}
3627
3628fn netdata_reorder_key(value: &str) -> String {
3629    value
3630        .trim_start_matches(|character: char| character.is_ascii_punctuation())
3631        .to_ascii_lowercase()
3632}
3633
3634fn histogram_update_every_seconds(histogram: &ExplorerHistogram) -> u64 {
3635    histogram
3636        .buckets
3637        .first()
3638        .map(|bucket| {
3639            bucket
3640                .end_realtime_usec
3641                .saturating_sub(bucket.start_realtime_usec)
3642                .checked_div(1_000_000)
3643                .unwrap_or(1)
3644                .max(1)
3645        })
3646        .unwrap_or(1)
3647}
3648
3649enum TimestampPrecision {
3650    Seconds,
3651    Micros,
3652}
3653
3654fn format_realtime_usec(timestamp: u64, precision: TimestampPrecision) -> String {
3655    let seconds = (timestamp / 1_000_000) as i64;
3656    let micros = (timestamp % 1_000_000) as u32;
3657    DateTime::<Utc>::from_timestamp(seconds, micros.saturating_mul(1000))
3658        .map(|datetime| match precision {
3659            TimestampPrecision::Seconds => datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3660            TimestampPrecision::Micros => datetime.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string(),
3661        })
3662        .unwrap_or_else(|| timestamp.to_string())
3663}
3664
3665fn priority_name(raw: &str) -> Option<&'static str> {
3666    match parse_priority(raw)? {
3667        0 => Some("panic"),
3668        1 => Some("alert"),
3669        2 => Some("critical"),
3670        3 => Some("error"),
3671        4 => Some("warning"),
3672        5 => Some("notice"),
3673        6 => Some("info"),
3674        7 => Some("debug"),
3675        _ => None,
3676    }
3677}
3678
3679fn priority_name_to_number(value: &str) -> Option<&'static str> {
3680    match value {
3681        "panic" | "emergency" | "emerg" => Some("0"),
3682        "alert" => Some("1"),
3683        "critical" | "crit" => Some("2"),
3684        "error" | "err" => Some("3"),
3685        "warning" | "warn" => Some("4"),
3686        "notice" => Some("5"),
3687        "info" => Some("6"),
3688        "debug" => Some("7"),
3689        _ => None,
3690    }
3691}
3692
3693fn parse_priority(raw: &str) -> Option<u8> {
3694    raw.parse::<u8>().ok()
3695}
3696
3697fn priority_to_row_severity(raw: &[u8]) -> &'static str {
3698    let raw = String::from_utf8_lossy(raw);
3699    match parse_priority(&raw) {
3700        Some(priority) if priority <= 3 => "critical",
3701        Some(4) => "warning",
3702        Some(5) => "notice",
3703        Some(priority) if priority >= 7 => "debug",
3704        _ => "normal",
3705    }
3706}
3707
3708fn syslog_facility_name(raw: &str) -> Option<&'static str> {
3709    match raw.parse::<u8>().ok()? {
3710        0 => Some("kern"),
3711        1 => Some("user"),
3712        2 => Some("mail"),
3713        3 => Some("daemon"),
3714        4 => Some("auth"),
3715        5 => Some("syslog"),
3716        6 => Some("lpr"),
3717        7 => Some("news"),
3718        8 => Some("uucp"),
3719        9 => Some("cron"),
3720        10 => Some("authpriv"),
3721        11 => Some("ftp"),
3722        16 => Some("local0"),
3723        17 => Some("local1"),
3724        18 => Some("local2"),
3725        19 => Some("local3"),
3726        20 => Some("local4"),
3727        21 => Some("local5"),
3728        22 => Some("local6"),
3729        23 => Some("local7"),
3730        _ => None,
3731    }
3732}
3733
3734const ERRNO_NAMES: &[(u32, &str)] = &[
3735    (1, "EPERM"),
3736    (2, "ENOENT"),
3737    (3, "ESRCH"),
3738    (4, "EINTR"),
3739    (5, "EIO"),
3740    (6, "ENXIO"),
3741    (7, "E2BIG"),
3742    (8, "ENOEXEC"),
3743    (9, "EBADF"),
3744    (10, "ECHILD"),
3745    (11, "EAGAIN"),
3746    (12, "ENOMEM"),
3747    (13, "EACCES"),
3748    (14, "EFAULT"),
3749    (15, "ENOTBLK"),
3750    (16, "EBUSY"),
3751    (17, "EEXIST"),
3752    (18, "EXDEV"),
3753    (19, "ENODEV"),
3754    (20, "ENOTDIR"),
3755    (21, "EISDIR"),
3756    (22, "EINVAL"),
3757    (23, "ENFILE"),
3758    (24, "EMFILE"),
3759    (25, "ENOTTY"),
3760    (26, "ETXTBSY"),
3761    (27, "EFBIG"),
3762    (28, "ENOSPC"),
3763    (29, "ESPIPE"),
3764    (30, "EROFS"),
3765    (31, "EMLINK"),
3766    (32, "EPIPE"),
3767    (33, "EDOM"),
3768    (34, "ERANGE"),
3769    (35, "EDEADLK"),
3770    (36, "ENAMETOOLONG"),
3771    (37, "ENOLCK"),
3772    (38, "ENOSYS"),
3773    (39, "ENOTEMPTY"),
3774    (40, "ELOOP"),
3775    (42, "ENOMSG"),
3776    (43, "EIDRM"),
3777    (44, "ECHRNG"),
3778    (45, "EL2NSYNC"),
3779    (46, "EL3HLT"),
3780    (47, "EL3RST"),
3781    (48, "ELNRNG"),
3782    (49, "EUNATCH"),
3783    (50, "ENOCSI"),
3784    (51, "EL2HLT"),
3785    (52, "EBADE"),
3786    (53, "EBADR"),
3787    (54, "EXFULL"),
3788    (55, "ENOANO"),
3789    (56, "EBADRQC"),
3790    (57, "EBADSLT"),
3791    (59, "EBFONT"),
3792    (60, "ENOSTR"),
3793    (61, "ENODATA"),
3794    (62, "ETIME"),
3795    (63, "ENOSR"),
3796    (64, "ENONET"),
3797    (65, "ENOPKG"),
3798    (66, "EREMOTE"),
3799    (67, "ENOLINK"),
3800    (68, "EADV"),
3801    (69, "ESRMNT"),
3802    (70, "ECOMM"),
3803    (71, "EPROTO"),
3804    (72, "EMULTIHOP"),
3805    (73, "EDOTDOT"),
3806    (74, "EBADMSG"),
3807    (75, "EOVERFLOW"),
3808    (76, "ENOTUNIQ"),
3809    (77, "EBADFD"),
3810    (78, "EREMCHG"),
3811    (79, "ELIBACC"),
3812    (80, "ELIBBAD"),
3813    (81, "ELIBSCN"),
3814    (82, "ELIBMAX"),
3815    (83, "ELIBEXEC"),
3816    (84, "EILSEQ"),
3817    (85, "ERESTART"),
3818    (86, "ESTRPIPE"),
3819    (87, "EUSERS"),
3820    (88, "ENOTSOCK"),
3821    (89, "EDESTADDRREQ"),
3822    (90, "EMSGSIZE"),
3823    (91, "EPROTOTYPE"),
3824    (92, "ENOPROTOOPT"),
3825    (93, "EPROTONOSUPPORT"),
3826    (94, "ESOCKTNOSUPPORT"),
3827    (95, "ENOTSUP"),
3828    (96, "EPFNOSUPPORT"),
3829    (97, "EAFNOSUPPORT"),
3830    (98, "EADDRINUSE"),
3831    (99, "EADDRNOTAVAIL"),
3832    (100, "ENETDOWN"),
3833    (101, "ENETUNREACH"),
3834    (102, "ENETRESET"),
3835    (103, "ECONNABORTED"),
3836    (104, "ECONNRESET"),
3837    (105, "ENOBUFS"),
3838    (106, "EISCONN"),
3839    (107, "ENOTCONN"),
3840    (108, "ESHUTDOWN"),
3841    (109, "ETOOMANYREFS"),
3842    (110, "ETIMEDOUT"),
3843    (111, "ECONNREFUSED"),
3844    (112, "EHOSTDOWN"),
3845    (113, "EHOSTUNREACH"),
3846    (114, "EALREADY"),
3847    (115, "EINPROGRESS"),
3848    (116, "ESTALE"),
3849    (117, "EUCLEAN"),
3850    (118, "ENOTNAM"),
3851    (119, "ENAVAIL"),
3852    (120, "EISNAM"),
3853    (121, "EREMOTEIO"),
3854    (122, "EDQUOT"),
3855    (123, "ENOMEDIUM"),
3856    (124, "EMEDIUMTYPE"),
3857    (125, "ECANCELED"),
3858    (126, "ENOKEY"),
3859    (127, "EKEYEXPIRED"),
3860    (128, "EKEYREVOKED"),
3861    (129, "EKEYREJECTED"),
3862    (130, "EOWNERDEAD"),
3863    (131, "ENOTRECOVERABLE"),
3864    (132, "ERFKILL"),
3865    (133, "EHWPOISON"),
3866];
3867
3868fn errno_name(raw: &str) -> Option<String> {
3869    let errno = raw.parse::<u32>().ok()?;
3870    let name = ERRNO_NAMES
3871        .iter()
3872        .find_map(|(candidate, name)| (*candidate == errno).then_some(*name))?;
3873    Some(format!("{errno} ({name})"))
3874}
3875
3876fn cap_effective_display(raw: &str) -> String {
3877    if !raw.bytes().next().is_some_and(|byte| byte.is_ascii_digit()) {
3878        return raw.to_string();
3879    }
3880    let Ok(value) = u64::from_str_radix(raw, 16) else {
3881        return raw.to_string();
3882    };
3883    if value == 0 {
3884        return raw.to_string();
3885    }
3886    const CAPABILITIES: &[&str] = &[
3887        "CHOWN",
3888        "DAC_OVERRIDE",
3889        "DAC_READ_SEARCH",
3890        "FOWNER",
3891        "FSETID",
3892        "KILL",
3893        "SETGID",
3894        "SETUID",
3895        "SETPCAP",
3896        "LINUX_IMMUTABLE",
3897        "NET_BIND_SERVICE",
3898        "NET_BROADCAST",
3899        "NET_ADMIN",
3900        "NET_RAW",
3901        "IPC_LOCK",
3902        "IPC_OWNER",
3903        "SYS_MODULE",
3904        "SYS_RAWIO",
3905        "SYS_CHROOT",
3906        "SYS_PTRACE",
3907        "SYS_PACCT",
3908        "SYS_ADMIN",
3909        "SYS_BOOT",
3910        "SYS_NICE",
3911        "SYS_RESOURCE",
3912        "SYS_TIME",
3913        "SYS_TTY_CONFIG",
3914        "MKNOD",
3915        "LEASE",
3916        "AUDIT_WRITE",
3917        "AUDIT_CONTROL",
3918        "SETFCAP",
3919        "MAC_OVERRIDE",
3920        "MAC_ADMIN",
3921        "SYSLOG",
3922        "WAKE_ALARM",
3923        "BLOCK_SUSPEND",
3924        "AUDIT_READ",
3925        "PERFMON",
3926        "BPF",
3927        "CHECKPOINT_RESTORE",
3928    ];
3929    let names: Vec<&str> = CAPABILITIES
3930        .iter()
3931        .enumerate()
3932        .filter_map(|(index, name)| ((value & (1u64 << index)) != 0).then_some(*name))
3933        .collect();
3934    if names.is_empty() {
3935        raw.to_string()
3936    } else {
3937        format!("{raw} ({})", names.join(" | "))
3938    }
3939}
3940
3941fn systemd_field_display_value(
3942    context: &DisplayContext,
3943    scope: DisplayScope,
3944    field: &str,
3945    value: &[u8],
3946    resolve_user_group_names: bool,
3947) -> Value {
3948    let raw = String::from_utf8_lossy(value);
3949    match field {
3950        "PRIORITY" => Value::String(priority_name(&raw).unwrap_or(&raw).to_string()),
3951        "SYSLOG_FACILITY" => Value::String(syslog_facility_name(&raw).unwrap_or(&raw).to_string()),
3952        "ERRNO" => Value::String(errno_name(&raw).unwrap_or_else(|| raw.to_string())),
3953        "MESSAGE_ID" => Value::String(match (message_id_name(&raw), scope) {
3954            (Some(name), DisplayScope::Data) => format!("{raw} ({name})"),
3955            (Some(name), DisplayScope::Facet | DisplayScope::Histogram) => name.to_string(),
3956            (None, _) => raw.into_owned(),
3957        }),
3958        "_BOOT_ID" => Value::String(match (context.boot_first_realtime.get(value), scope) {
3959            (Some(timestamp), DisplayScope::Data) => format!(
3960                "{} ({})  ",
3961                raw,
3962                format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
3963            ),
3964            (Some(timestamp), DisplayScope::Facet | DisplayScope::Histogram) => {
3965                format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
3966            }
3967            (None, _) => raw.into_owned(),
3968        }),
3969        "_UID"
3970        | "_SYSTEMD_OWNER_UID"
3971        | "OBJECT_SYSTEMD_OWNER_UID"
3972        | "OBJECT_UID"
3973        | "_AUDIT_LOGINUID"
3974        | "OBJECT_AUDIT_LOGINUID" => {
3975            if resolve_user_group_names {
3976                Value::String(cached_uid_display(context, raw.as_ref()))
3977            } else {
3978                Value::String(raw.into_owned())
3979            }
3980        }
3981        "_GID" | "OBJECT_GID" => {
3982            if resolve_user_group_names {
3983                Value::String(cached_gid_display(context, raw.as_ref()))
3984            } else {
3985                Value::String(raw.into_owned())
3986            }
3987        }
3988        "_CAP_EFFECTIVE" => Value::String(cap_effective_display(&raw)),
3989        "_SOURCE_REALTIME_TIMESTAMP" => Value::String(match raw.parse::<u64>() {
3990            Ok(timestamp) if timestamp != 0 => {
3991                format!(
3992                    "{} ({})",
3993                    raw,
3994                    format_realtime_usec(timestamp, TimestampPrecision::Micros)
3995                )
3996            }
3997            _ => raw.into_owned(),
3998        }),
3999        _ => Value::String(raw.into_owned()),
4000    }
4001}
4002
4003fn cached_uid_display(context: &DisplayContext, raw: &str) -> String {
4004    if let Some(value) = context.uid_display_cache.borrow().get(raw) {
4005        return value.clone();
4006    }
4007    let value = resolve_uid_name(raw).unwrap_or_else(|| raw.to_string());
4008    context
4009        .uid_display_cache
4010        .borrow_mut()
4011        .insert(raw.to_string(), value.clone());
4012    value
4013}
4014
4015fn cached_gid_display(context: &DisplayContext, raw: &str) -> String {
4016    if let Some(value) = context.gid_display_cache.borrow().get(raw) {
4017        return value.clone();
4018    }
4019    let value = resolve_gid_name(raw).unwrap_or_else(|| raw.to_string());
4020    context
4021        .gid_display_cache
4022        .borrow_mut()
4023        .insert(raw.to_string(), value.clone());
4024    value
4025}
4026
4027#[cfg(unix)]
4028fn resolve_uid_name(raw: &str) -> Option<String> {
4029    let uid = raw.parse::<libc::uid_t>().ok()?;
4030    let mut pwd = std::mem::MaybeUninit::<libc::passwd>::uninit();
4031    let mut result = std::ptr::null_mut();
4032    let mut buffer = vec![0i8; 16_384];
4033    let rc = unsafe {
4034        libc::getpwuid_r(
4035            uid,
4036            pwd.as_mut_ptr(),
4037            buffer.as_mut_ptr(),
4038            buffer.len(),
4039            &mut result,
4040        )
4041    };
4042    if rc != 0 || result.is_null() {
4043        return None;
4044    }
4045    let pwd = unsafe { pwd.assume_init() };
4046    Some(
4047        unsafe { CStr::from_ptr(pwd.pw_name) }
4048            .to_string_lossy()
4049            .into_owned(),
4050    )
4051}
4052
4053#[cfg(not(unix))]
4054fn resolve_uid_name(_raw: &str) -> Option<String> {
4055    None
4056}
4057
4058#[cfg(unix)]
4059fn resolve_gid_name(raw: &str) -> Option<String> {
4060    let gid = raw.parse::<libc::gid_t>().ok()?;
4061    let mut grp = std::mem::MaybeUninit::<libc::group>::uninit();
4062    let mut result = std::ptr::null_mut();
4063    let mut buffer = vec![0i8; 16_384];
4064    let rc = unsafe {
4065        libc::getgrgid_r(
4066            gid,
4067            grp.as_mut_ptr(),
4068            buffer.as_mut_ptr(),
4069            buffer.len(),
4070            &mut result,
4071        )
4072    };
4073    if rc != 0 || result.is_null() {
4074        return None;
4075    }
4076    let grp = unsafe { grp.assume_init() };
4077    Some(
4078        unsafe { CStr::from_ptr(grp.gr_name) }
4079            .to_string_lossy()
4080            .into_owned(),
4081    )
4082}
4083
4084#[cfg(not(unix))]
4085fn resolve_gid_name(_raw: &str) -> Option<String> {
4086    None
4087}
4088
4089const MESSAGE_ID_NAMES: &[(&str, &str)] = &[
4090    ("f77379a8490b408bbe5f6940505a777b", "Journal started"),
4091    ("d93fb3c9c24d451a97cea615ce59c00b", "Journal stopped"),
4092    (
4093        "a596d6fe7bfa4994828e72309e95d61e",
4094        "Journal messages suppressed",
4095    ),
4096    (
4097        "e9bf28e6e834481bb6f48f548ad13606",
4098        "Journal messages missed",
4099    ),
4100    (
4101        "ec387f577b844b8fa948f33cad9a75e6",
4102        "Journal disk space usage",
4103    ),
4104    ("fc2e22bc6ee647b6b90729ab34a250b1", "Coredump"),
4105    ("5aadd8e954dc4b1a8c954d63fd9e1137", "Coredump truncated"),
4106    ("1f4e0a44a88649939aaea34fc6da8c95", "Backtrace"),
4107    ("8d45620c1a4348dbb17410da57c60c66", "User Session created"),
4108    (
4109        "3354939424b4456d9802ca8333ed424a",
4110        "User Session terminated",
4111    ),
4112    ("fcbefc5da23d428093f97c82a9290f7b", "Seat started"),
4113    ("e7852bfe46784ed0accde04bc864c2d5", "Seat removed"),
4114    (
4115        "24d8d4452573402496068381a6312df2",
4116        "VM or container started",
4117    ),
4118    (
4119        "58432bd3bace477cb514b56381b8a758",
4120        "VM or container stopped",
4121    ),
4122    ("c7a787079b354eaaa9e77b371893cd27", "Time change"),
4123    ("45f82f4aef7a4bbf942ce861d1f20990", "Timezone change"),
4124    (
4125        "50876a9db00f4c40bde1a2ad381c3a1b",
4126        "System configuration issues",
4127    ),
4128    (
4129        "b07a249cd024414a82dd00cd181378ff",
4130        "System start-up completed",
4131    ),
4132    (
4133        "eed00a68ffd84e31882105fd973abdd1",
4134        "User start-up completed",
4135    ),
4136    ("6bbd95ee977941e497c48be27c254128", "Sleep start"),
4137    ("8811e6df2a8e40f58a94cea26f8ebf14", "Sleep stop"),
4138    (
4139        "98268866d1d54a499c4e98921d93bc40",
4140        "System shutdown initiated",
4141    ),
4142    (
4143        "c14aaf76ec284a5fa1f105f88dfb061c",
4144        "System factory reset initiated",
4145    ),
4146    ("d9ec5e95e4b646aaaea2fd05214edbda", "Container init crashed"),
4147    (
4148        "3ed0163e868a4417ab8b9e210407a96c",
4149        "System reboot failed after crash",
4150    ),
4151    ("645c735537634ae0a32b15a7c6cba7d4", "Init execution froze"),
4152    (
4153        "5addb3a06a734d3396b794bf98fb2d01",
4154        "Init crashed no coredump",
4155    ),
4156    ("5c9e98de4ab94c6a9d04d0ad793bd903", "Init crashed no fork"),
4157    (
4158        "5e6f1f5e4db64a0eaee3368249d20b94",
4159        "Init crashed unknown signal",
4160    ),
4161    (
4162        "83f84b35ee264f74a3896a9717af34cb",
4163        "Init crashed systemd signal",
4164    ),
4165    (
4166        "3a73a98baf5b4b199929e3226c0be783",
4167        "Init crashed process signal",
4168    ),
4169    (
4170        "2ed18d4f78ca47f0a9bc25271c26adb4",
4171        "Init crashed waitpid failed",
4172    ),
4173    (
4174        "56b1cd96f24246c5b607666fda952356",
4175        "Init crashed coredump failed",
4176    ),
4177    ("4ac7566d4d7548f4981f629a28f0f829", "Init crashed coredump"),
4178    (
4179        "38e8b1e039ad469291b18b44c553a5b7",
4180        "Crash shell failed to fork",
4181    ),
4182    (
4183        "872729b47dbe473eb768ccecd477beda",
4184        "Crash shell failed to execute",
4185    ),
4186    ("658a67adc1c940b3b3316e7e8628834a", "Selinux failed"),
4187    ("e6f456bd92004d9580160b2207555186", "Battery low warning"),
4188    (
4189        "267437d33fdd41099ad76221cc24a335",
4190        "Battery low powering off",
4191    ),
4192    (
4193        "79e05b67bc4545d1922fe47107ee60c5",
4194        "Manager mainloop failed",
4195    ),
4196    ("dbb136b10ef4457ba47a795d62f108c9", "Manager no xdgdir path"),
4197    (
4198        "ed158c2df8884fa584eead2d902c1032",
4199        "Init failed to drop capability bounding set of usermode",
4200    ),
4201    (
4202        "42695b500df048298bee37159caa9f2e",
4203        "Init failed to drop capability bounding set",
4204    ),
4205    (
4206        "bfc2430724ab44499735b4f94cca9295",
4207        "User manager can't disable new privileges",
4208    ),
4209    (
4210        "59288af523be43a28d494e41e26e4510",
4211        "Manager failed to start default target",
4212    ),
4213    (
4214        "689b4fcc97b4486ea5da92db69c9e314",
4215        "Manager failed to isolate default target",
4216    ),
4217    (
4218        "5ed836f1766f4a8a9fc5da45aae23b29",
4219        "Manager failed to collect passed file descriptors",
4220    ),
4221    (
4222        "6a40fbfbd2ba4b8db02fb40c9cd090d7",
4223        "Init failed to fix up environment variables",
4224    ),
4225    (
4226        "0e54470984ac419689743d957a119e2e",
4227        "Manager failed to allocate",
4228    ),
4229    (
4230        "d67fa9f847aa4b048a2ae33535331adb",
4231        "Manager failed to write Smack",
4232    ),
4233    (
4234        "af55a6f75b544431b72649f36ff6d62c",
4235        "System shutdown critical error",
4236    ),
4237    (
4238        "d18e0339efb24a068d9c1060221048c2",
4239        "Init failed to fork off valgrind",
4240    ),
4241    ("7d4958e842da4a758f6c1cdc7b36dcc5", "Unit starting"),
4242    ("39f53479d3a045ac8e11786248231fbf", "Unit started"),
4243    ("be02cf6855d2428ba40df7e9d022f03d", "Unit failed"),
4244    ("de5b426a63be47a7b6ac3eaac82e2f6f", "Unit stopping"),
4245    ("9d1aaa27d60140bd96365438aad20286", "Unit stopped"),
4246    ("d34d037fff1847e6ae669a370e694725", "Unit reloading"),
4247    ("7b05ebc668384222baa8881179cfda54", "Unit reloaded"),
4248    ("5eb03494b6584870a536b337290809b3", "Unit restart scheduled"),
4249    ("ae8f7b866b0347b9af31fe1c80b127c0", "Unit resources"),
4250    ("7ad2d189f7e94e70a38c781354912448", "Unit success"),
4251    ("0e4284a0caca4bfc81c0bb6786972673", "Unit skipped"),
4252    ("d9b373ed55a64feb8242e02dbe79a49c", "Unit failure result"),
4253    (
4254        "641257651c1b4ec9a8624d7a40a9e1e7",
4255        "Process execution failed",
4256    ),
4257    ("98e322203f7a4ed290d09fe03c09fe15", "Unit process exited"),
4258    ("0027229ca0644181a76c4e92458afa2e", "Syslog forward missed"),
4259    (
4260        "1dee0369c7fc4736b7099b38ecb46ee7",
4261        "Mount point is not empty",
4262    ),
4263    ("d989611b15e44c9dbf31e3c81256e4ed", "Unit oomd kill"),
4264    ("fe6faa94e7774663a0da52717891d8ef", "Unit out of memory"),
4265    ("b72ea4a2881545a0b50e200e55b9b06f", "Lid opened"),
4266    ("b72ea4a2881545a0b50e200e55b9b070", "Lid closed"),
4267    ("f5f416b862074b28927a48c3ba7d51ff", "System docked"),
4268    ("51e171bd585248568110144c517cca53", "System undocked"),
4269    ("b72ea4a2881545a0b50e200e55b9b071", "Power key"),
4270    ("3e0117101eb243c1b9a50db3494ab10b", "Power key long press"),
4271    ("9fa9d2c012134ec385451ffe316f97d0", "Reboot key"),
4272    ("f1c59a58c9d943668965c337caec5975", "Reboot key long press"),
4273    ("b72ea4a2881545a0b50e200e55b9b072", "Suspend key"),
4274    ("bfdaf6d312ab4007bc1fe40a15df78e8", "Suspend key long press"),
4275    ("b72ea4a2881545a0b50e200e55b9b073", "Hibernate key"),
4276    (
4277        "167836df6f7f428e98147227b2dc8945",
4278        "Hibernate key long press",
4279    ),
4280    ("c772d24e9a884cbeb9ea12625c306c01", "Invalid configuration"),
4281    (
4282        "1675d7f172174098b1108bf8c7dc8f5d",
4283        "DNSSEC validation failed",
4284    ),
4285    (
4286        "4d4408cfd0d144859184d1e65d7c8a65",
4287        "DNSSEC trust anchor revoked",
4288    ),
4289    ("36db2dfa5a9045e1bd4af5f93e1cf057", "DNSSEC turned off"),
4290    ("b61fdac612e94b9182285b998843061f", "Username unsafe"),
4291    (
4292        "1b3bb94037f04bbf81028e135a12d293",
4293        "Mount point path not suitable",
4294    ),
4295    (
4296        "010190138f494e29a0ef6669749531aa",
4297        "Device path not suitable",
4298    ),
4299    ("b480325f9c394a7b802c231e51a2752c", "Nobody user unsuitable"),
4300    (
4301        "1c0454c1bd2241e0ac6fefb4bc631433",
4302        "Systemd udev settle deprecated",
4303    ),
4304    ("7c8a41f37b764941a0e1780b1be2f037", "Time initial sync"),
4305    ("7db73c8af0d94eeb822ae04323fe6ab6", "Time initial bump"),
4306    ("9e7066279dc8403da79ce4b1a69064b2", "Shutdown scheduled"),
4307    ("249f6fb9e6e2428c96f3f0875681ffa3", "Shutdown canceled"),
4308    ("3f7d5ef3e54f4302b4f0b143bb270cab", "TPM PCR Extended"),
4309    ("f9b0be465ad540d0850ad32172d57c21", "Memory Trimmed"),
4310    ("a8fa8dacdb1d443e9503b8be367a6adb", "SysV Service Found"),
4311    (
4312        "187c62eb1e7f463bb530394f52cb090f",
4313        "Portable Service attached",
4314    ),
4315    (
4316        "76c5c754d628490d8ecba4c9d042112b",
4317        "Portable Service detached",
4318    ),
4319    (
4320        "9cf56b8baf9546cf9478783a8de42113",
4321        "systemd-networkd sysctl changed by foreign process",
4322    ),
4323    (
4324        "ad7089f928ac4f7ea00c07457d47ba8a",
4325        "SRK into TPM authorization failure",
4326    ),
4327    (
4328        "b2bcbaf5edf948e093ce50bbea0e81ec",
4329        "Secure Attention Key (SAK) was pressed",
4330    ),
4331    ("7fc63312330b479bb32e598d47cef1a8", "dbus activate no unit"),
4332    (
4333        "ee9799dab1e24d81b7bee7759a543e1b",
4334        "dbus activate masked unit",
4335    ),
4336    ("a0fa58cafd6f4f0c8d003d16ccf9e797", "dbus broker exited"),
4337    ("c8c6cde1c488439aba371a664353d9d8", "dbus dirwatch"),
4338    ("8af3357071af4153af414daae07d38e7", "dbus dispatch stats"),
4339    ("199d4300277f495f84ba4028c984214c", "dbus no sopeergroup"),
4340    (
4341        "b209c0d9d1764ab38d13b8e00d1784d6",
4342        "dbus protocol violation",
4343    ),
4344    ("6fa70fa776044fa28be7a21daf42a108", "dbus receive failed"),
4345    (
4346        "0ce0fa61d1a9433dabd67417f6b8e535",
4347        "dbus service failed open",
4348    ),
4349    ("24dc708d9e6a4226a3efe2033bb744de", "dbus service invalid"),
4350    ("f15d2347662d483ea9bcd8aa1a691d28", "dbus sighup"),
4351    (
4352        "0ce153587afa4095832d233c17a88001",
4353        "Gnome SM startup succeeded",
4354    ),
4355    (
4356        "10dd2dc188b54a5e98970f56499d1f73",
4357        "Gnome SM unrecoverable failure",
4358    ),
4359    ("f3ea493c22934e26811cd62abe8e203a", "Gnome shell started"),
4360    ("c7b39b1e006b464599465e105b361485", "Flatpak cache"),
4361    ("75ba3deb0af041a9a46272ff85d9e73e", "Flathub pulls"),
4362    ("f02bce89a54e4efab3a94a797d26204a", "Flathub pull errors"),
4363    ("dd11929c788e48bdbb6276fb5f26b08a", "Boltd starting"),
4364    ("1e6061a9fbd44501b3ccc368119f2b69", "Netdata startup"),
4365    (
4366        "ed4cdb8f1beb4ad3b57cb3cae2d162fa",
4367        "Netdata connection from child",
4368    ),
4369    (
4370        "6e2e3839067648968b646045dbf28d66",
4371        "Netdata connection to parent",
4372    ),
4373    (
4374        "9ce0cb58ab8b44df82c4bf1ad9ee22de",
4375        "Netdata alert transition",
4376    ),
4377    (
4378        "6db0018e83e34320ae2a659d78019fb7",
4379        "Netdata alert notification",
4380    ),
4381    ("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"),
4382    (
4383        "8ddaf5ba33a74078b609250db1e951f3",
4384        "Sensor state transition",
4385    ),
4386    (
4387        "ec87a56120d5431bace51e2fb8bba243",
4388        "Netdata log flood protection",
4389    ),
4390    (
4391        "acb33cb95778476baac702eb7e4e151d",
4392        "Netdata Cloud connection",
4393    ),
4394    (
4395        "d1f59606dd4d41e3b217a0cfcae8e632",
4396        "Netdata extreme cardinality",
4397    ),
4398    ("02f47d350af5449197bf7a95b605a468", "Netdata exit reason"),
4399    (
4400        "4fdf40816c124623a032b7fe73beacb8",
4401        "Netdata dynamic configuration",
4402    ),
4403];
4404
4405fn message_id_name(raw: &str) -> Option<&'static str> {
4406    MESSAGE_ID_NAMES
4407        .iter()
4408        .find_map(|(candidate, name)| (*candidate == raw).then_some(*name))
4409}
4410
4411#[cfg(test)]
4412mod tests {
4413    use super::*;
4414    use crate::ExplorerHistogramBucket;
4415    use journal_core::file::{JournalFile, JournalFileOptions, JournalWriter, MmapMut};
4416    use journal_core::repository::File as RepoFile;
4417    use std::cell::Cell;
4418    use std::collections::HashMap;
4419    use tempfile::TempDir;
4420
4421    #[derive(Default)]
4422    struct TestNetdataState {
4423        metadata: HashMap<PathBuf, NetdataJournalFileMetadata>,
4424        updates: Vec<(PathBuf, u64)>,
4425    }
4426
4427    impl NetdataFunctionState for TestNetdataState {
4428        fn file_metadata(&self, path: &Path) -> Option<NetdataJournalFileMetadata> {
4429            self.metadata.get(path).cloned()
4430        }
4431
4432        fn update_file_journal_vs_realtime_delta_usec(&mut self, path: &Path, delta_usec: u64) {
4433            self.updates.push((path.to_path_buf(), delta_usec));
4434        }
4435    }
4436
4437    fn test_uuid(seed: u8) -> uuid::Uuid {
4438        uuid::Uuid::from_bytes([seed; 16])
4439    }
4440
4441    fn write_netdata_test_journal(directory: &std::path::Path, count: usize) {
4442        write_named_netdata_test_journal(
4443            directory,
4444            "netdata-api-test.journal",
4445            count,
4446            1_700_000_000_000_000,
4447        );
4448    }
4449
4450    fn write_named_netdata_test_journal(
4451        directory: &std::path::Path,
4452        name: &str,
4453        count: usize,
4454        start_realtime_usec: u64,
4455    ) {
4456        write_stepped_netdata_test_journal(directory, name, count, start_realtime_usec, 1);
4457    }
4458
4459    fn write_stepped_netdata_test_journal(
4460        directory: &std::path::Path,
4461        name: &str,
4462        count: usize,
4463        start_realtime_usec: u64,
4464        step_realtime_usec: u64,
4465    ) {
4466        std::fs::create_dir_all(directory).expect("create journal dir");
4467        let path = directory.join(name);
4468        let repo_file = RepoFile::from_path(&path).expect("repo file");
4469        let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
4470        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
4471        let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
4472        for index in 0..count {
4473            let message = format!("MESSAGE=row-{index}");
4474            let service = if index % 2 == 0 {
4475                b"SERVICE=even".as_slice()
4476            } else {
4477                b"SERVICE=odd".as_slice()
4478            };
4479            let payloads: [&[u8]; 3] = [message.as_bytes(), service, b"PRIORITY=6"];
4480            let realtime = start_realtime_usec
4481                .saturating_add((index as u64).saturating_mul(step_realtime_usec));
4482            writer
4483                .add_entry(&mut file, &payloads, realtime, realtime)
4484                .expect("write entry");
4485        }
4486        file.sync().expect("sync journal");
4487    }
4488
4489    #[test]
4490    fn parses_netdata_selections_as_and_fields_or_values() {
4491        let request = json!({
4492            "after": 200_000_000,
4493            "before": 200_000_100,
4494            "direction": "forward",
4495            "last": 25,
4496            "facets": ["PRIORITY"],
4497            "selections": {
4498                "PRIORITY": ["warning", "error"],
4499                "_HOSTNAME": ["node-a"],
4500                "__logs_sources": ["all-local-system-logs"],
4501            }
4502        });
4503
4504        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4505            .expect("parse request");
4506        assert_eq!(parsed.after_realtime_usec, Some(200_000_000_000_000));
4507        assert_eq!(parsed.before_realtime_usec, Some(200_000_100_999_999));
4508        assert_eq!(parsed.direction, Direction::Forward);
4509        assert_eq!(parsed.limit, 25);
4510        assert_eq!(parsed.filters.len(), 2);
4511        assert_eq!(parsed.filters[0].field, b"PRIORITY");
4512        assert_eq!(parsed.filters[0].values, vec![b"4".to_vec(), b"3".to_vec()]);
4513        assert_eq!(parsed.filters[1].field, b"_HOSTNAME");
4514        assert_eq!(parsed.filters[1].values, vec![b"node-a".to_vec()]);
4515    }
4516
4517    #[test]
4518    fn netdata_last_one_keeps_echo_and_uses_effective_minimum_two() {
4519        let request = json!({
4520            "after": 200_000_000,
4521            "before": 200_000_100,
4522            "last": 1
4523        });
4524
4525        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4526            .expect("parse request");
4527        let query =
4528            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4529
4530        assert_eq!(parsed.echo.get("last").and_then(Value::as_u64), Some(1));
4531        assert_eq!(parsed.limit, 2);
4532        assert_eq!(query.limit, 2);
4533    }
4534
4535    #[test]
4536    fn netdata_facet_counts_use_native_sliced_filter_semantics() {
4537        let request = json!({
4538            "after": 200_000_000,
4539            "before": 200_000_100,
4540            "facets": ["PRIORITY"],
4541            "selections": {
4542                "PRIORITY": ["warning"]
4543            }
4544        });
4545
4546        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4547            .expect("parse request");
4548        let query =
4549            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4550
4551        assert!(!query.exclude_facet_field_filters);
4552    }
4553
4554    #[test]
4555    fn netdata_multi_filter_facet_counts_exclude_same_field_filter() {
4556        let request = json!({
4557            "after": 200_000_000,
4558            "before": 200_000_100,
4559            "facets": ["PRIORITY", "_BOOT_ID"],
4560            "selections": {
4561                "PRIORITY": ["warning"],
4562                "_BOOT_ID": ["738043aea7b3417cbc3e9941ad26f769"]
4563            }
4564        });
4565
4566        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4567            .expect("parse request");
4568        let query =
4569            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4570
4571        assert!(query.exclude_facet_field_filters);
4572    }
4573
4574    #[test]
4575    fn parses_netdata_fts_query_like_simple_pattern() {
4576        let (terms, positives, negatives) =
4577            parse_fts_query_patterns(r"error|warning|!debug|escaped\|pipe|\!literal| a*B");
4578
4579        assert_eq!(
4580            positives,
4581            vec![
4582                b"error".to_vec(),
4583                b"warning".to_vec(),
4584                b"escaped|pipe".to_vec(),
4585                b"!literal".to_vec(),
4586                b" a*B".to_vec(),
4587            ]
4588        );
4589        assert_eq!(negatives, vec![b"debug".to_vec()]);
4590        assert_eq!(terms.len(), 6);
4591        assert!(!terms[0].negative);
4592        assert!(terms[2].negative);
4593        assert_eq!(
4594            terms[5],
4595            ExplorerFtsPattern {
4596                parts: vec![b" a".to_vec(), b"B".to_vec()],
4597                negative: false,
4598            }
4599        );
4600
4601        let request = json!({
4602            "query": r"alpha|!debug|needle\|pipe",
4603            "facets": ["PRIORITY"],
4604        });
4605        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4606            .expect("parse request");
4607        let query =
4608            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4609        assert_eq!(
4610            query.fts_patterns,
4611            vec![b"alpha".to_vec(), b"needle|pipe".to_vec()]
4612        );
4613        assert_eq!(query.fts_negative_patterns, vec![b"debug".to_vec()]);
4614        assert_eq!(query.fts_terms.len(), 3);
4615    }
4616
4617    #[test]
4618    fn netdata_requests_never_enable_debug_row_traversal_column_collection() {
4619        let request = json!({
4620            "facets": ["PRIORITY", "_HOSTNAME"],
4621            "histogram": "PRIORITY",
4622            "last": 25
4623        });
4624
4625        let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4626            .expect("parse request");
4627        let query =
4628            parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4629
4630        assert!(!query.debug_collect_column_fields_by_row_traversal);
4631    }
4632
4633    #[test]
4634    fn netdata_function_suppresses_absent_requested_facet_groups() {
4635        let dir = TempDir::new().expect("tempdir");
4636        write_netdata_test_journal(dir.path(), 10);
4637        let request = json!({
4638            "after": 1_700_000_000,
4639            "before": 1_700_000_010,
4640            "facets": ["SERVICE", "MISSING_FIELD"],
4641            "histogram": "SERVICE",
4642            "last": 5,
4643            "slice": true
4644        });
4645        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4646
4647        let response = function
4648            .run_directory_request_json_with_options(
4649                dir.path(),
4650                &request,
4651                NetdataFunctionRunOptions::from_timeout_seconds(0),
4652            )
4653            .expect("run function");
4654
4655        let columns = response["columns"].as_object().expect("columns");
4656        assert!(columns.contains_key("SERVICE"));
4657        assert!(!columns.contains_key("MISSING_FIELD"));
4658        let facets = response["facets"].as_array().expect("facets");
4659        assert_eq!(
4660            facets
4661                .iter()
4662                .filter_map(|facet| facet["id"].as_str())
4663                .collect::<Vec<_>>(),
4664            vec!["SERVICE"]
4665        );
4666        let accepted = response["accepted_params"]
4667            .as_array()
4668            .expect("accepted params");
4669        assert!(accepted.iter().any(|value| value == "SERVICE"));
4670        assert!(!accepted.iter().any(|value| value == "MISSING_FIELD"));
4671        let histograms = response["available_histograms"]
4672            .as_array()
4673            .expect("available histograms");
4674        assert!(histograms.iter().any(|value| value["id"] == "SERVICE"));
4675        assert!(
4676            !histograms
4677                .iter()
4678                .any(|value| value["id"] == "MISSING_FIELD")
4679        );
4680    }
4681
4682    #[test]
4683    fn netdata_function_reports_zero_count_existing_facets_for_empty_results() {
4684        let dir = TempDir::new().expect("tempdir");
4685        write_netdata_test_journal(dir.path(), 10);
4686        let request = json!({
4687            "after": 1_700_000_000,
4688            "before": 1_700_000_010,
4689            "facets": ["PRIORITY"],
4690            "histogram": "PRIORITY",
4691            "selections": {
4692                "SERVICE": ["missing"]
4693            },
4694            "last": 5,
4695            "slice": true
4696        });
4697        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4698
4699        let response = function
4700            .run_directory_request_json_with_options(
4701                dir.path(),
4702                &request,
4703                NetdataFunctionRunOptions::from_timeout_seconds(0),
4704            )
4705            .expect("run function");
4706
4707        let facets = response["facets"].as_array().expect("facets");
4708        assert_eq!(facets.len(), 1);
4709        assert_eq!(facets[0]["id"], "PRIORITY");
4710        let options = facets[0]["options"].as_array().expect("options");
4711        assert!(options.iter().any(|option| {
4712            option["id"] == "6" && option["name"] == "info" && option["count"] == 0
4713        }));
4714        assert_eq!(response["items"]["matched"], 0);
4715    }
4716
4717    #[test]
4718    fn netdata_function_api_reports_progress() {
4719        let dir = TempDir::new().expect("tempdir");
4720        write_netdata_test_journal(dir.path(), 9_000);
4721        let request = json!({
4722            "after": 1_700_000_000,
4723            "before": 1_700_000_010,
4724            "facets": ["SERVICE"],
4725            "histogram": "SERVICE",
4726            "last": 0
4727        });
4728        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4729        let mut reports = 0u64;
4730        let mut progress = |progress: NetdataFunctionProgress| {
4731            reports = reports.saturating_add(1);
4732            assert_eq!(progress.current_file, 1);
4733            assert_eq!(progress.total_files, 1);
4734            assert!(progress.stats.rows_examined <= 9_000);
4735        };
4736        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4737        options.progress_interval = Duration::ZERO;
4738        options.progress_callback = Some(&mut progress);
4739
4740        let response = function
4741            .run_directory_request_json_with_options(dir.path(), &request, options)
4742            .expect("run function");
4743
4744        assert_eq!(response["status"], 200);
4745        assert!(reports > 0);
4746        assert_eq!(response["last_modified"], 1_700_000_000_008_999u64);
4747    }
4748
4749    #[test]
4750    fn netdata_function_api_reports_file_end_progress_for_small_scans() {
4751        let dir = TempDir::new().expect("tempdir");
4752        write_netdata_test_journal(dir.path(), 10);
4753        let request = json!({
4754            "after": 1_700_000_000,
4755            "before": 1_700_000_010,
4756            "facets": ["SERVICE"],
4757            "histogram": "SERVICE",
4758            "last": 0
4759        });
4760        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4761        let mut reports = 0u64;
4762        let mut last_rows_examined = 0u64;
4763        let mut progress = |progress: NetdataFunctionProgress| {
4764            reports = reports.saturating_add(1);
4765            last_rows_examined = progress.stats.rows_examined;
4766        };
4767        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4768        options.progress_callback = Some(&mut progress);
4769
4770        let response = function
4771            .run_directory_request_json_with_options(dir.path(), &request, options)
4772            .expect("run function");
4773
4774        assert_eq!(response["status"], 200);
4775        assert_eq!(reports, 1);
4776        assert_eq!(last_rows_examined, 10);
4777    }
4778
4779    #[test]
4780    fn netdata_function_progress_counts_only_query_files() {
4781        let dir = TempDir::new().expect("tempdir");
4782        write_named_netdata_test_journal(
4783            dir.path(),
4784            "old-window.journal",
4785            10,
4786            1_600_000_000_000_000,
4787        );
4788        write_named_netdata_test_journal(
4789            dir.path(),
4790            "current-window.journal",
4791            10,
4792            1_700_000_000_000_000,
4793        );
4794        let request = json!({
4795            "after": 1_700_000_000,
4796            "before": 1_700_000_010,
4797            "facets": ["SERVICE"],
4798            "histogram": "SERVICE",
4799            "last": 0
4800        });
4801        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4802        let mut reports = Vec::new();
4803        let mut progress = |progress: NetdataFunctionProgress| {
4804            reports.push((progress.current_file, progress.total_files));
4805        };
4806        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4807        options.progress_callback = Some(&mut progress);
4808
4809        let response = function
4810            .run_directory_request_json_with_options(dir.path(), &request, options)
4811            .expect("run function");
4812
4813        assert_eq!(response["status"], 200);
4814        assert_eq!(response["_journal_files"]["matched"], 1);
4815        assert_eq!(reports, vec![(1, 1)]);
4816    }
4817
4818    #[test]
4819    fn netdata_function_api_reports_cancellation() {
4820        let dir = TempDir::new().expect("tempdir");
4821        write_netdata_test_journal(dir.path(), 9_000);
4822        let request = json!({
4823            "after": 1_700_000_000,
4824            "before": 1_700_000_010,
4825            "facets": ["SERVICE"],
4826            "histogram": "SERVICE",
4827            "last": 0
4828        });
4829        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4830        let is_cancelled = || true;
4831        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4832        options.cancellation_callback = Some(&is_cancelled);
4833
4834        let response = function
4835            .run_directory_request_json_with_options(dir.path(), &request, options)
4836            .expect("run function");
4837
4838        assert_eq!(response["status"], 499);
4839        assert_eq!(response["errorMessage"], "Request cancelled.");
4840        assert_eq!(
4841            response.as_object().expect("response object").len(),
4842            2,
4843            "plugin-compatible function errors only include status and errorMessage"
4844        );
4845    }
4846
4847    #[test]
4848    fn netdata_function_api_cancels_during_active_scan() {
4849        let dir = TempDir::new().expect("tempdir");
4850        write_netdata_test_journal(dir.path(), 9_000);
4851        let request = json!({
4852            "after": 1_700_000_000,
4853            "before": 1_700_000_010,
4854            "facets": ["SERVICE"],
4855            "histogram": "SERVICE",
4856            "last": 0
4857        });
4858        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4859        let should_cancel = Cell::new(false);
4860        let mut reports = 0u64;
4861        let mut progress = |progress: NetdataFunctionProgress| {
4862            reports = reports.saturating_add(1);
4863            if progress.stats.rows_examined > 0 {
4864                should_cancel.set(true);
4865            }
4866        };
4867        let is_cancelled = || should_cancel.get();
4868        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4869        options.progress_interval = Duration::ZERO;
4870        options.progress_callback = Some(&mut progress);
4871        options.cancellation_callback = Some(&is_cancelled);
4872
4873        let response = function
4874            .run_directory_request_json_with_options(dir.path(), &request, options)
4875            .expect("run function");
4876
4877        assert_eq!(response["status"], 499);
4878        assert_eq!(response["errorMessage"], "Request cancelled.");
4879        assert!(reports > 0);
4880        assert!(should_cancel.get());
4881    }
4882
4883    #[test]
4884    fn netdata_function_api_honors_cancellation_after_final_file_progress() {
4885        let dir = TempDir::new().expect("tempdir");
4886        write_netdata_test_journal(dir.path(), 10);
4887        let request = json!({
4888            "after": 1_700_000_000,
4889            "before": 1_700_000_010,
4890            "facets": ["SERVICE"],
4891            "histogram": "SERVICE",
4892            "last": 0
4893        });
4894        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4895        let should_cancel = Cell::new(false);
4896        let mut progress = |_progress: NetdataFunctionProgress| {
4897            should_cancel.set(true);
4898        };
4899        let is_cancelled = || should_cancel.get();
4900        let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4901        options.progress_callback = Some(&mut progress);
4902        options.cancellation_callback = Some(&is_cancelled);
4903
4904        let response = function
4905            .run_directory_request_json_with_options(dir.path(), &request, options)
4906            .expect("run function");
4907
4908        assert_eq!(response["status"], 499);
4909        assert_eq!(response["errorMessage"], "Request cancelled.");
4910        assert!(should_cancel.get());
4911    }
4912
4913    #[test]
4914    fn netdata_function_api_reports_timeout_as_partial_table() {
4915        let dir = TempDir::new().expect("tempdir");
4916        write_netdata_test_journal(dir.path(), 10);
4917        let request = json!({
4918            "after": 1_700_000_000,
4919            "before": 1_700_000_010,
4920            "facets": ["SERVICE"],
4921            "histogram": "SERVICE",
4922            "last": 0
4923        });
4924        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4925        let options = NetdataFunctionRunOptions {
4926            timeout: Some(Duration::ZERO),
4927            ..NetdataFunctionRunOptions::default()
4928        };
4929
4930        let response = function
4931            .run_directory_request_json_with_options(dir.path(), &request, options)
4932            .expect("run function");
4933
4934        assert_eq!(response["status"], 200);
4935        assert_eq!(response["partial"], true);
4936        assert_eq!(response["message"]["status"], "warning");
4937        assert_eq!(
4938            response["message"]["title"],
4939            "Query timed-out, incomplete data. "
4940        );
4941    }
4942
4943    #[test]
4944    fn netdata_function_api_reports_sampling_counters() {
4945        let dir = TempDir::new().expect("tempdir");
4946        write_stepped_netdata_test_journal(
4947            dir.path(),
4948            "netdata-api-test.journal",
4949            5_000,
4950            1_700_000_000_000_000,
4951            1_000,
4952        );
4953        let request = json!({
4954            "after": 1_700_000_000,
4955            "before": 1_700_000_005,
4956            "facets": ["SERVICE"],
4957            "histogram": "SERVICE",
4958            "last": 5,
4959            "sampling": 20
4960        });
4961        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4962
4963        let response = function
4964            .run_directory_request_json_with_options(
4965                dir.path(),
4966                &request,
4967                NetdataFunctionRunOptions::from_timeout_seconds(0),
4968            )
4969            .expect("run function");
4970
4971        assert_eq!(response["status"], 200);
4972        assert!(
4973            response["_sampling"]["sampled"]
4974                .as_u64()
4975                .unwrap_or_default()
4976                > 0
4977        );
4978        assert!(
4979            response["_sampling"]["unsampled"]
4980                .as_u64()
4981                .unwrap_or_default()
4982                > 0
4983        );
4984        assert!(
4985            response["_sampling"]["estimated"]
4986                .as_u64()
4987                .unwrap_or_default()
4988                > 0
4989        );
4990        assert_eq!(
4991            response["items"]["estimated"],
4992            response["_sampling"]["estimated"]
4993        );
4994        assert!(
4995            response["items"]["unsampled"].as_u64().unwrap_or_default()
4996                < response["_sampling"]["unsampled"]
4997                    .as_u64()
4998                    .unwrap_or_default()
4999        );
5000        assert_eq!(response["message"]["status"], "notice");
5001    }
5002
5003    #[test]
5004    fn netdata_function_api_disables_sampling_for_data_only() {
5005        let dir = TempDir::new().expect("tempdir");
5006        write_netdata_test_journal(dir.path(), 5_000);
5007        let request = json!({
5008            "after": 1_700_000_000,
5009            "before": 1_700_000_010,
5010            "data_only": true,
5011            "last": 5,
5012            "sampling": 20
5013        });
5014        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5015
5016        let response = function
5017            .run_directory_request_json_with_options(
5018                dir.path(),
5019                &request,
5020                NetdataFunctionRunOptions::from_timeout_seconds(0),
5021            )
5022            .expect("run function");
5023
5024        assert_eq!(response["status"], 200);
5025        assert!(response.get("_sampling").is_none());
5026    }
5027
5028    #[test]
5029    fn normalizes_missing_time_window_to_last_hour_like_plugin() {
5030        assert_eq!(
5031            normalize_time_window(1_000_000_000, None, None),
5032            (Some(999_996_400_000_000), Some(1_000_000_000_999_999))
5033        );
5034    }
5035
5036    #[test]
5037    fn normalizes_inverted_time_window_like_plugin() {
5038        assert_eq!(
5039            normalize_time_window(1_000_000_000, Some(200_000_100), Some(200_000_000)),
5040            (Some(200_000_000_000_000), Some(200_000_100_999_999))
5041        );
5042    }
5043
5044    #[test]
5045    fn normalizes_equal_time_window_like_plugin() {
5046        assert_eq!(
5047            normalize_time_window(1_000_000_000, Some(200_000_000), Some(200_000_000)),
5048            (Some(199_996_400_000_000), Some(200_000_000_999_999))
5049        );
5050    }
5051
5052    #[test]
5053    fn normalizes_relative_time_window_like_plugin() {
5054        assert_eq!(
5055            normalize_time_window(1_000_000_000, Some(100), Some(200)),
5056            (Some(999_999_701_000_000), Some(999_999_800_999_999))
5057        );
5058    }
5059
5060    #[test]
5061    fn normalizes_missing_after_with_supplied_before_like_plugin() {
5062        assert_eq!(
5063            normalize_time_window(1_000_000_000, None, Some(200_000_000)),
5064            (Some(199_999_401_000_000), Some(200_000_000_999_999))
5065        );
5066    }
5067
5068    #[test]
5069    fn systemd_profile_transforms_priority_and_facility_for_display() {
5070        let profile = SystemdJournalProfile;
5071        let context = DisplayContext::default();
5072        assert_eq!(
5073            profile.field_display_value(&context, DisplayScope::Data, "PRIORITY", b"7"),
5074            json!("debug")
5075        );
5076        assert_eq!(
5077            profile.field_display_value(&context, DisplayScope::Data, "SYSLOG_FACILITY", b"3"),
5078            json!("daemon")
5079        );
5080        assert_eq!(priority_to_row_severity(b"3"), "critical");
5081        assert_eq!(priority_to_row_severity(b"6"), "normal");
5082    }
5083
5084    #[test]
5085    fn dynamic_process_name_matches_plugin_fallback_order() {
5086        let mut fields = BTreeMap::new();
5087        fields.insert("SYSLOG_IDENTIFIER".to_string(), vec![b"syslog".to_vec()]);
5088        fields.insert("_COMM".to_string(), vec![b"comm".to_vec()]);
5089        fields.insert("_PID".to_string(), vec![b"42".to_vec()]);
5090        fields.insert("SYSLOG_PID".to_string(), vec![b"99".to_vec()]);
5091        assert_eq!(dynamic_process_name(&fields), "syslog[42]");
5092
5093        fields.insert("CONTAINER_NAME".to_string(), vec![b"container".to_vec()]);
5094        assert_eq!(dynamic_process_name(&fields), "container[42]");
5095
5096        fields.remove("CONTAINER_NAME");
5097        fields.remove("SYSLOG_IDENTIFIER");
5098        fields.remove("_PID");
5099        assert_eq!(dynamic_process_name(&fields), "comm");
5100
5101        fields.remove("_COMM");
5102        fields.insert("_EXE".to_string(), vec![b"/usr/bin/app".to_vec()]);
5103        assert_eq!(dynamic_process_name(&fields), "-");
5104    }
5105
5106    #[test]
5107    fn facet_values_are_truncated_and_collapsed_like_plugin() {
5108        let prefix = vec![b'a'; NETDATA_FACET_MAX_VALUE_LENGTH];
5109        let mut first = prefix.clone();
5110        first.extend_from_slice(b"-first");
5111        let mut second = prefix.clone();
5112        second.extend_from_slice(b"-second");
5113
5114        let mut values = BTreeMap::new();
5115        add_netdata_facet_count(&mut values, &first, 2);
5116        add_netdata_facet_count(&mut values, &second, 3);
5117
5118        assert_eq!(values.len(), 1);
5119        assert_eq!(values.get(&prefix), Some(&5));
5120    }
5121
5122    #[test]
5123    fn histogram_values_are_truncated_and_collapsed_like_plugin() {
5124        let prefix = vec![b'b'; NETDATA_FACET_MAX_VALUE_LENGTH];
5125        let mut first = prefix.clone();
5126        first.extend_from_slice(b"-first");
5127        let mut second = prefix.clone();
5128        second.extend_from_slice(b"-second");
5129
5130        let mut values = HashMap::new();
5131        values.insert(first, 2);
5132        values.insert(second, 3);
5133        let histogram = ExplorerHistogram {
5134            field: b"TEST_FIELD".to_vec(),
5135            buckets: vec![ExplorerHistogramBucket {
5136                start_realtime_usec: 1_000_000,
5137                end_realtime_usec: 2_000_000,
5138                values,
5139            }],
5140        };
5141
5142        let function = NetdataJournalFunction::systemd_journal();
5143        let rendered = function.build_histogram(&DisplayContext::default(), &histogram, None);
5144        let labels = rendered["chart"]["result"]["labels"]
5145            .as_array()
5146            .expect("labels");
5147        assert_eq!(labels.len(), 2);
5148        assert_eq!(labels[1], Value::String(String::from_utf8(prefix).unwrap()));
5149        assert_eq!(rendered["chart"]["result"]["data"][0][1][0], json!(5));
5150    }
5151
5152    #[test]
5153    fn duplicate_row_timestamps_match_plugin_direction_adjustment() {
5154        let mut backward = vec![
5155            test_located_row(100),
5156            test_located_row(100),
5157            test_located_row(100),
5158            test_located_row(90),
5159        ];
5160        make_row_timestamps_unique(&mut backward, Direction::Backward);
5161        assert_eq!(
5162            backward
5163                .iter()
5164                .map(|row| row.row.realtime_usec)
5165                .collect::<Vec<_>>(),
5166            vec![100, 99, 98, 90]
5167        );
5168
5169        let mut forward = vec![
5170            test_located_row(90),
5171            test_located_row(100),
5172            test_located_row(100),
5173            test_located_row(100),
5174        ];
5175        make_row_timestamps_unique(&mut forward, Direction::Forward);
5176        assert_eq!(
5177            forward
5178                .iter()
5179                .map(|row| row.row.realtime_usec)
5180                .collect::<Vec<_>>(),
5181            vec![90, 100, 101, 102]
5182        );
5183    }
5184
5185    #[test]
5186    fn page_window_counts_forward_anchor_like_netdata_facets() {
5187        let config = NetdataFunctionConfig::systemd_journal();
5188        let request = NetdataRequest::parse(
5189            &json!({
5190                "after": 1_700_000_000,
5191                "before": 1_700_000_010,
5192                "anchor": 1_700_000_005_000_000u64,
5193                "direction": "forward",
5194                "last": 2
5195            }),
5196            &config,
5197        )
5198        .expect("parse request");
5199        let mut window = NetdataPageWindow::for_request(&request);
5200
5201        for realtime_usec in [
5202            1_700_000_003_000_000,
5203            1_700_000_004_000_000,
5204            1_700_000_006_000_000,
5205            1_700_000_007_000_000,
5206            1_700_000_008_000_000,
5207        ] {
5208            window.observe(realtime_usec);
5209        }
5210
5211        let counters = window.counters();
5212        assert_eq!(counters.matched, 3);
5213        assert_eq!(counters.before, 1);
5214        assert_eq!(counters.after, 2);
5215    }
5216
5217    #[test]
5218    fn page_window_counts_backward_anchor_like_netdata_facets() {
5219        let config = NetdataFunctionConfig::systemd_journal();
5220        let request = NetdataRequest::parse(
5221            &json!({
5222                "after": 1_700_000_000,
5223                "before": 1_700_000_010,
5224                "anchor": 1_700_000_008_000_000u64,
5225                "direction": "backward",
5226                "last": 2
5227            }),
5228            &config,
5229        )
5230        .expect("parse request");
5231        let mut window = NetdataPageWindow::for_request(&request);
5232
5233        for realtime_usec in [
5234            1_700_000_009_000_000,
5235            1_700_000_007_000_000,
5236            1_700_000_006_000_000,
5237            1_700_000_005_000_000,
5238        ] {
5239            window.observe(realtime_usec);
5240        }
5241
5242        let counters = window.counters();
5243        assert_eq!(counters.matched, 3);
5244        assert_eq!(counters.before, 1);
5245        assert_eq!(counters.after, 1);
5246    }
5247
5248    #[test]
5249    fn realtime_adjuster_preserves_forward_state_across_file_boundaries() {
5250        let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Forward);
5251
5252        assert_eq!(adjuster.adjust(10), 10);
5253        assert_eq!(adjuster.adjust(10), 11);
5254        assert_eq!(adjuster.adjust(10), 12);
5255    }
5256
5257    #[test]
5258    fn realtime_adjuster_preserves_backward_state_across_file_boundaries() {
5259        let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Backward);
5260
5261        assert_eq!(adjuster.adjust(10), 10);
5262        assert_eq!(adjuster.adjust(10), 9);
5263        assert_eq!(adjuster.adjust(10), 8);
5264    }
5265
5266    #[test]
5267    fn systemd_profile_keeps_user_group_ids_raw_by_default() {
5268        let context = DisplayContext::default();
5269        let profile = SystemdJournalProfile;
5270        assert_eq!(
5271            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
5272            json!("0")
5273        );
5274        assert_eq!(
5275            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
5276            json!("0")
5277        );
5278    }
5279
5280    #[cfg(unix)]
5281    #[test]
5282    fn plugin_compatible_profile_resolves_user_group_ids_explicitly() {
5283        let context = DisplayContext::default();
5284        let profile = SystemdJournalPluginProfile;
5285        assert_eq!(
5286            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
5287            json!("root")
5288        );
5289        assert_eq!(
5290            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
5291            json!("root")
5292        );
5293    }
5294
5295    #[cfg(unix)]
5296    #[test]
5297    fn plugin_compatible_profile_caches_user_group_resolution() {
5298        let context = DisplayContext::default();
5299        let profile = SystemdJournalPluginProfile;
5300
5301        assert_eq!(
5302            profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
5303            json!("root")
5304        );
5305        assert_eq!(
5306            profile.field_display_value(&context, DisplayScope::Data, "_UID", b"0"),
5307            json!("root")
5308        );
5309        assert_eq!(
5310            profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
5311            json!("root")
5312        );
5313        assert_eq!(
5314            profile.field_display_value(&context, DisplayScope::Data, "_GID", b"0"),
5315            json!("root")
5316        );
5317
5318        assert_eq!(context.uid_display_cache.borrow().len(), 1);
5319        assert_eq!(context.gid_display_cache.borrow().len(), 1);
5320    }
5321
5322    #[test]
5323    fn file_overlap_uses_netdata_max_realtime_slack() {
5324        let file_first_seconds = 200_000_000u64;
5325        let file_last_seconds = 200_000_100u64;
5326        let slack_seconds = NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC / 1_000_000;
5327        let header = crate::FileHeader {
5328            signature: *b"LPKSHHRH",
5329            compatible_flags: 0,
5330            incompatible_flags: 0,
5331            state: 0,
5332            header_size: 0,
5333            n_entries: 0,
5334            head_entry_realtime: file_first_seconds * 1_000_000,
5335            tail_entry_realtime: file_last_seconds * 1_000_000,
5336            head_entry_seqnum: 0,
5337            tail_entry_seqnum: 0,
5338            tail_entry_boot_id: [0; 16],
5339            seqnum_id: [0; 16],
5340        };
5341        let config = NetdataFunctionConfig::systemd_journal();
5342
5343        let inside_slack = NetdataRequest::parse(
5344            &json!({
5345                "after": file_last_seconds + slack_seconds - 1,
5346                "before": file_last_seconds + slack_seconds + 500
5347            }),
5348            &config,
5349        )
5350        .expect("parse request");
5351        assert!(file_may_overlap_request(header, &inside_slack));
5352
5353        let outside_slack = NetdataRequest::parse(
5354            &json!({
5355                "after": file_last_seconds + slack_seconds + 1,
5356                "before": file_last_seconds + slack_seconds + 500
5357            }),
5358            &config,
5359        )
5360        .expect("parse request");
5361        assert!(!file_may_overlap_request(header, &outside_slack));
5362    }
5363
5364    #[test]
5365    fn journal_file_order_matches_plugin_comparator_shape() {
5366        let older = JournalFileOrderInfo {
5367            msg_first_realtime_usec: 100,
5368            msg_last_realtime_usec: 200,
5369            file_last_modified_usec: 200,
5370            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
5371        };
5372        let newer = JournalFileOrderInfo {
5373            msg_first_realtime_usec: 100,
5374            msg_last_realtime_usec: 300,
5375            file_last_modified_usec: 100,
5376            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
5377        };
5378        assert_eq!(
5379            compare_journal_file_order(&newer, &older, Direction::Backward),
5380            Ordering::Less
5381        );
5382        assert_eq!(
5383            compare_journal_file_order(&newer, &older, Direction::Forward),
5384            Ordering::Greater
5385        );
5386
5387        let newer_mtime = JournalFileOrderInfo {
5388            msg_first_realtime_usec: 100,
5389            msg_last_realtime_usec: 200,
5390            file_last_modified_usec: 300,
5391            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
5392        };
5393        assert_eq!(
5394            compare_journal_file_order(&newer_mtime, &older, Direction::Backward),
5395            Ordering::Less
5396        );
5397
5398        let newer_first = JournalFileOrderInfo {
5399            msg_first_realtime_usec: 150,
5400            msg_last_realtime_usec: 200,
5401            file_last_modified_usec: 200,
5402            journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
5403        };
5404        assert_eq!(
5405            compare_journal_file_order(&newer_first, &older, Direction::Backward),
5406            Ordering::Less
5407        );
5408    }
5409
5410    #[test]
5411    fn boot_first_realtime_keeps_earliest_timestamp_like_plugin() {
5412        let mut boot_first = BTreeMap::new();
5413        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 300);
5414        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 100);
5415        record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 200);
5416
5417        assert_eq!(boot_first.get(b"boot-a".as_slice()), Some(&100));
5418    }
5419
5420    #[test]
5421    fn merge_histogram_rejects_inconsistent_bucket_shape() {
5422        let mut target = Some(ExplorerHistogram {
5423            field: b"PRIORITY".to_vec(),
5424            buckets: vec![ExplorerHistogramBucket {
5425                start_realtime_usec: 1_000_000,
5426                end_realtime_usec: 2_000_000,
5427                values: HashMap::new(),
5428            }],
5429        });
5430        let source = ExplorerHistogram {
5431            field: b"PRIORITY".to_vec(),
5432            buckets: vec![ExplorerHistogramBucket {
5433                start_realtime_usec: 1_000_000,
5434                end_realtime_usec: 1_500_000,
5435                values: HashMap::new(),
5436            }],
5437        };
5438
5439        let err = merge_histogram(&mut target, source).expect_err("shape mismatch");
5440        assert!(matches!(
5441            err,
5442            SdkError::Unsupported("inconsistent Netdata histogram bucket shape")
5443        ));
5444    }
5445
5446    #[test]
5447    fn collect_journal_files_recurses_nested_directories() {
5448        let dir = TempDir::new().expect("tempdir");
5449        let nested = dir.path().join("machine").join("nested");
5450        write_named_netdata_test_journal(&nested, "system.journal", 1, 1_700_000_000_000_000);
5451
5452        let collection = collect_journal_files(dir.path()).expect("collect files");
5453
5454        assert_eq!(collection.files.len(), 1);
5455        assert_eq!(collection.skipped, 0);
5456        assert!(collection.errors.is_empty());
5457        assert_eq!(
5458            collection.files[0]
5459                .file_name()
5460                .and_then(|name| name.to_str()),
5461            Some("system.journal")
5462        );
5463    }
5464
5465    #[cfg(unix)]
5466    #[test]
5467    fn collect_journal_files_deduplicates_symlinked_directories() {
5468        let dir = TempDir::new().expect("tempdir");
5469        let real = dir.path().join("real");
5470        let link = dir.path().join("link");
5471        write_named_netdata_test_journal(&real, "system.journal", 1, 1_700_000_000_000_000);
5472        std::os::unix::fs::symlink(&real, &link).expect("symlink");
5473
5474        let collection = collect_journal_files(dir.path()).expect("collect files");
5475
5476        assert_eq!(collection.files.len(), 1);
5477        assert_eq!(collection.skipped, 0);
5478        assert!(collection.errors.is_empty());
5479    }
5480
5481    #[cfg(unix)]
5482    #[test]
5483    fn collect_journal_files_deduplicates_symlinked_files() {
5484        let dir = TempDir::new().expect("tempdir");
5485        write_named_netdata_test_journal(dir.path(), "system.journal", 1, 1_700_000_000_000_000);
5486        std::os::unix::fs::symlink(
5487            dir.path().join("system.journal"),
5488            dir.path().join("system-copy.journal"),
5489        )
5490        .expect("symlink");
5491
5492        let collection = collect_journal_files(dir.path()).expect("collect files");
5493
5494        assert_eq!(collection.files.len(), 1);
5495        assert_eq!(collection.skipped, 0);
5496        assert!(collection.errors.is_empty());
5497    }
5498
5499    #[cfg(unix)]
5500    #[test]
5501    fn collect_journal_files_reports_unreadable_subdirectories() {
5502        use std::os::unix::fs::PermissionsExt;
5503
5504        if unsafe { libc::geteuid() } == 0 {
5505            return;
5506        }
5507
5508        let dir = TempDir::new().expect("tempdir");
5509        std::fs::write(dir.path().join("visible.journal"), b"").expect("journal");
5510        let locked = dir.path().join("locked");
5511        std::fs::create_dir(&locked).expect("locked dir");
5512        std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o000))
5513            .expect("lock dir");
5514
5515        let collection = collect_journal_files(dir.path()).expect("collect files");
5516
5517        std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o700))
5518            .expect("unlock dir");
5519        assert_eq!(collection.files.len(), 1);
5520        assert_eq!(collection.skipped, 1);
5521        assert_eq!(collection.errors.len(), 1);
5522        assert!(collection.errors[0].contains("locked"));
5523    }
5524
5525    #[test]
5526    fn source_summary_fills_missing_caller_metadata_from_header() {
5527        let dir = TempDir::new().expect("tempdir");
5528        write_named_netdata_test_journal(dir.path(), "system.journal", 2, 1_700_000_000_000_000);
5529        let path = dir.path().join("system.journal");
5530        let mut state = TestNetdataState::default();
5531        state.metadata.insert(
5532            path.clone(),
5533            NetdataJournalFileMetadata {
5534                msg_first_realtime_usec: Some(1_699_999_999_000_000),
5535                ..NetdataJournalFileMetadata::default()
5536            },
5537        );
5538        let options = NetdataFunctionRunOptions {
5539            state: Some(&mut state),
5540            ..NetdataFunctionRunOptions::default()
5541        };
5542
5543        let summary = JournalSourceSummary::from_paths(&[path], ReaderOptions::default(), &options);
5544
5545        assert_eq!(summary.first_realtime_usec, Some(1_699_999_999_000_000));
5546        assert_eq!(summary.last_realtime_usec, Some(1_700_000_000_000_001));
5547    }
5548
5549    #[test]
5550    fn source_selection_echoes_and_filters_known_groups() {
5551        let config = NetdataFunctionConfig::systemd_journal();
5552        let request = NetdataRequest::parse(
5553            &json!({
5554                "selections": {
5555                    "__logs_sources": ["all-local-system-logs"]
5556                }
5557            }),
5558            &config,
5559        )
5560        .expect("parse source-filtered request");
5561
5562        assert_eq!(request.source_type, SOURCE_TYPE_LOCAL_SYSTEM);
5563        assert_eq!(
5564            request.echo.get("source_type").and_then(Value::as_u64),
5565            Some(SOURCE_TYPE_LOCAL_SYSTEM)
5566        );
5567        assert!(
5568            request
5569                .echo
5570                .pointer("/selections/__logs_sources/0")
5571                .is_some_and(Value::is_null)
5572        );
5573        assert!(request.matches_source(Path::new("/var/log/journal/machine/system.journal"), None));
5574        assert!(!request.matches_source(
5575            Path::new("/var/log/journal/machine/user-1000.journal"),
5576            None
5577        ));
5578    }
5579
5580    #[test]
5581    fn source_selection_uses_caller_metadata_before_filename_fallback() {
5582        let dir = TempDir::new().expect("tempdir");
5583        write_named_netdata_test_journal(dir.path(), "user-1000.journal", 1, 1_700_000_000_000_000);
5584        let path = dir.path().join("user-1000.journal");
5585        let config = NetdataFunctionConfig::systemd_journal();
5586        let request = NetdataRequest::parse(
5587            &json!({
5588                "after": 1_700_000_000,
5589                "before": 1_700_000_001,
5590                "selections": {
5591                    "__logs_sources": ["all-local-system-logs"]
5592                }
5593            }),
5594            &config,
5595        )
5596        .expect("parse source-filtered request");
5597        assert!(!request.matches_source(&path, None));
5598
5599        let mut state = TestNetdataState::default();
5600        state.metadata.insert(
5601            path.clone(),
5602            NetdataJournalFileMetadata {
5603                source_type: Some(NETDATA_SOURCE_TYPE_LOCAL_SYSTEM),
5604                source_name: Some("system-registry".to_string()),
5605                ..NetdataJournalFileMetadata::default()
5606            },
5607        );
5608        let options = NetdataFunctionRunOptions {
5609            state: Some(&mut state),
5610            ..NetdataFunctionRunOptions::default()
5611        };
5612        let selected =
5613            select_journal_files_for_request(vec![path], &request, config.reader_options, &options);
5614
5615        assert_eq!(selected.files.len(), 1);
5616    }
5617
5618    #[test]
5619    fn netdata_function_state_receives_learned_source_realtime_delta() {
5620        let dir = TempDir::new().expect("tempdir");
5621        let commit_realtime_usec = 1_700_000_030_000_000;
5622        let source_realtime_usec = commit_realtime_usec - 30_000_000;
5623        write_source_realtime_delta_journal(
5624            dir.path(),
5625            "system.journal",
5626            commit_realtime_usec,
5627            source_realtime_usec,
5628        );
5629        let request = json!({
5630            "after": 1_700_000_029,
5631            "before": 1_700_000_031,
5632            "facets": ["SERVICE"],
5633            "histogram": "SERVICE",
5634            "last": 1,
5635            "sampling": 0
5636        });
5637        let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5638        let mut state = TestNetdataState::default();
5639        let options = NetdataFunctionRunOptions {
5640            state: Some(&mut state),
5641            ..NetdataFunctionRunOptions::from_timeout_seconds(0)
5642        };
5643
5644        let response = function
5645            .run_directory_request_json_with_options(dir.path(), &request, options)
5646            .expect("run function");
5647
5648        assert_eq!(response["status"], 200);
5649        assert_eq!(state.updates.len(), 1);
5650        assert_eq!(state.updates[0].1, 30_000_000);
5651    }
5652
5653    #[test]
5654    fn source_classification_matches_plugin_filename_shape() {
5655        assert_eq!(
5656            journal_file_source_type(Path::new("/var/log/journal/machine/system.journal")),
5657            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM
5658        );
5659        assert_eq!(
5660            journal_file_source_type(Path::new("/var/log/journal/machine/user-1000.journal")),
5661            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER
5662        );
5663        assert_eq!(
5664            journal_file_source_type(Path::new("/var/log/journal/machine/other.journal")),
5665            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
5666        );
5667        assert_eq!(
5668            journal_file_source_type(Path::new(
5669                "/var/log/journal/machine.namespace/system.journal"
5670            )),
5671            SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE
5672        );
5673        assert_eq!(
5674            journal_file_source_type(Path::new(
5675                "/var/log/journal/remote/remote-host-a@machine.journal"
5676            )),
5677            SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL
5678        );
5679    }
5680
5681    #[test]
5682    fn exact_source_names_follow_plugin_prefixes() {
5683        assert_eq!(
5684            journal_file_exact_source_name(Path::new(
5685                "/var/log/journal/machine.namespace/system.journal"
5686            ))
5687            .as_deref(),
5688            Some("namespace-namespace")
5689        );
5690        assert_eq!(
5691            journal_file_exact_source_name(Path::new(
5692                "/var/log/journal/remote/remote-host-a@machine.journal"
5693            ))
5694            .as_deref(),
5695            Some("remote-host-a")
5696        );
5697        assert_eq!(
5698            journal_file_exact_source_name(Path::new(
5699                "/var/log/journal/remote/remote-host-b.journal~.zst"
5700            ))
5701            .as_deref(),
5702            Some("remote-host-b")
5703        );
5704    }
5705
5706    #[test]
5707    fn disposed_journal_extension_matches_plugin_scan_contract() {
5708        assert!(is_journal_file_name(Path::new("active.journal")));
5709        assert!(is_journal_file_name(Path::new("archived.journal~")));
5710        assert!(is_journal_file_name(Path::new("active.journal.zst")));
5711        assert!(is_journal_file_name(Path::new("archived.journal~.zst")));
5712    }
5713
5714    fn test_located_row(realtime_usec: u64) -> LocatedRow {
5715        LocatedRow {
5716            file_path: PathBuf::from("test.journal"),
5717            row: ExplorerRow {
5718                realtime_usec,
5719                cursor: String::new(),
5720                payloads: Vec::new(),
5721            },
5722        }
5723    }
5724
5725    fn write_source_realtime_delta_journal(
5726        directory: &std::path::Path,
5727        name: &str,
5728        commit_realtime_usec: u64,
5729        source_realtime_usec: u64,
5730    ) {
5731        std::fs::create_dir_all(directory).expect("create journal dir");
5732        let path = directory.join(name);
5733        let repo_file = RepoFile::from_path(&path).expect("repo file");
5734        let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
5735        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
5736        let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
5737        let source = format!("_SOURCE_REALTIME_TIMESTAMP={source_realtime_usec}");
5738        let payloads: [&[u8]; 4] = [
5739            b"MESSAGE=source lag test".as_slice(),
5740            b"SERVICE=delta".as_slice(),
5741            b"PRIORITY=6".as_slice(),
5742            source.as_bytes(),
5743        ];
5744        writer
5745            .add_entry(
5746                &mut file,
5747                &payloads,
5748                commit_realtime_usec,
5749                commit_realtime_usec,
5750            )
5751            .expect("write entry");
5752        file.sync().expect("sync journal");
5753    }
5754}