1use crate::explorer::{
2 ExplorerSamplingState, empty_histogram_for_query, histogram_bucket_count_for_query,
3};
4use crate::{
5 Direction, ExplorerAnchor, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
6 ExplorerFtsPattern, ExplorerHistogram, ExplorerProgress, ExplorerQuery, ExplorerResult,
7 ExplorerRow, ExplorerSampling, ExplorerStats, ExplorerStopReason, ExplorerStrategy, FileHeader,
8 FileReader, ReaderOptions, Result, SdkError,
9};
10use chrono::{DateTime, Utc};
11use serde_json::{Map, Value, json};
12use std::cell::RefCell;
13use std::cmp::{Ordering, Reverse};
14use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet, VecDeque};
15#[cfg(unix)]
16use std::ffi::CStr;
17use std::path::{Path, PathBuf};
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19
20const DEFAULT_FUNCTION_NAME: &str = "systemd-journal";
21const DEFAULT_SOURCE_SELECTOR_NAME: &str = "Journal Sources";
22const DEFAULT_SOURCE_SELECTOR_HELP: &str = "Select the logs source to query";
23const DEFAULT_ITEMS_TO_RETURN: usize = 200;
24const DEFAULT_TIME_WINDOW_SECONDS: i64 = 3600;
25const DEFAULT_ITEMS_SAMPLING: u64 = 1_000_000;
26const DATA_ONLY_CHECK_EVERY_ROWS: u64 = 128;
27const API_RELATIVE_TIME_MAX_SECONDS: i64 = 3 * 365 * 86_400;
28const NETDATA_MISSING_AFTER_RELATIVE_SECONDS: i64 = 600;
29const DEFAULT_HISTOGRAM_BUCKETS: usize = 150;
30const EFFECTIVELY_DISABLED_TIMEOUT_SECONDS: u64 = 100 * 365 * 24 * 60 * 60;
31const NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC: u64 = 5_000_000;
32const NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC: u64 = 2 * 60 * 1_000_000;
33const NETDATA_EMPTY_STRING_FACET_HASH_ID: &str = "CzGfAU2z3TC";
34const NETDATA_UNAVAILABLE_FIELD_LABEL: &str = "[unavailable field]";
35const NETDATA_FACET_MAX_VALUE_LENGTH: usize = 8192;
36const NETDATA_MAX_DIRECTORY_SCAN_DEPTH: usize = 64;
37const NETDATA_MAX_DIRECTORY_SCAN_COUNT: usize = 8192;
38const SOURCE_TYPE_ALL: u64 = 1 << 0;
39const SOURCE_TYPE_LOCAL_ALL: u64 = 1 << 1;
40const SOURCE_TYPE_REMOTE_ALL: u64 = 1 << 2;
41const SOURCE_TYPE_LOCAL_SYSTEM: u64 = 1 << 3;
42const SOURCE_TYPE_LOCAL_USER: u64 = 1 << 4;
43const SOURCE_TYPE_LOCAL_NAMESPACE: u64 = 1 << 5;
44const SOURCE_TYPE_LOCAL_OTHER: u64 = 1 << 6;
45
46pub const NETDATA_SOURCE_TYPE_ALL: u64 = SOURCE_TYPE_ALL;
47pub const NETDATA_SOURCE_TYPE_LOCAL_ALL: u64 = SOURCE_TYPE_LOCAL_ALL;
48pub const NETDATA_SOURCE_TYPE_REMOTE_ALL: u64 = SOURCE_TYPE_REMOTE_ALL;
49pub const NETDATA_SOURCE_TYPE_LOCAL_SYSTEM: u64 = SOURCE_TYPE_LOCAL_SYSTEM;
50pub const NETDATA_SOURCE_TYPE_LOCAL_USER: u64 = SOURCE_TYPE_LOCAL_USER;
51pub const NETDATA_SOURCE_TYPE_LOCAL_NAMESPACE: u64 = SOURCE_TYPE_LOCAL_NAMESPACE;
52pub const NETDATA_SOURCE_TYPE_LOCAL_OTHER: u64 = SOURCE_TYPE_LOCAL_OTHER;
53
54const NETDATA_ACCEPTED_PARAMS: &[&str] = &[
55 "info",
56 "__logs_sources",
57 "after",
58 "before",
59 "anchor",
60 "direction",
61 "last",
62 "query",
63 "facets",
64 "histogram",
65 "if_modified_since",
66 "data_only",
67 "delta",
68 "tail",
69 "sampling",
70 "slice",
71];
72
73const SYSTEMD_DEFAULT_VIEW_KEYS: &[&str] = &[
74 "_HOSTNAME",
75 "ND_JOURNAL_PROCESS",
76 "MESSAGE",
77 "PRIORITY",
78 "SYSLOG_FACILITY",
79 "ERRNO",
80 "ND_JOURNAL_FILE",
81 "SYSLOG_IDENTIFIER",
82 "UNIT",
83 "USER_UNIT",
84 "MESSAGE_ID",
85 "_BOOT_ID",
86 "_SYSTEMD_OWNER_UID",
87 "_UID",
88 "OBJECT_SYSTEMD_OWNER_UID",
89 "OBJECT_UID",
90 "_GID",
91 "OBJECT_GID",
92 "_CAP_EFFECTIVE",
93 "_AUDIT_LOGINUID",
94 "OBJECT_AUDIT_LOGINUID",
95 "_SOURCE_REALTIME_TIMESTAMP",
96];
97
98const SYSTEMD_DEFAULT_FACETS: &[&str] = &[
99 "_HOSTNAME",
100 "PRIORITY",
101 "SYSLOG_FACILITY",
102 "ERRNO",
103 "SYSLOG_IDENTIFIER",
104 "UNIT",
105 "USER_UNIT",
106 "MESSAGE_ID",
107 "_BOOT_ID",
108 "_SYSTEMD_OWNER_UID",
109 "_UID",
110 "OBJECT_SYSTEMD_OWNER_UID",
111 "OBJECT_UID",
112 "_GID",
113 "OBJECT_GID",
114 "_AUDIT_LOGINUID",
115 "OBJECT_AUDIT_LOGINUID",
116 "CODE_FILE",
117 "_SYSTEMD_UNIT",
118 "_SYSTEMD_USER_SLICE",
119 "CODE_FUNC",
120 "_TRANSPORT",
121 "_COMM",
122 "_RUNTIME_SCOPE",
123 "_MACHINE_ID",
124 "_SYSTEMD_SLICE",
125 "UNIT_RESULT",
126 "_SYSTEMD_CGROUP",
127 "_EXE",
128 "_SYSTEMD_USER_UNIT",
129 "_SYSTEMD_SESSION",
130 "COREDUMP_CGROUP",
131 "COREDUMP_USER_UNIT",
132 "COREDUMP_UNIT",
133 "COREDUMP_SIGNAL_NAME",
134 "COREDUMP_COMM",
135 "_UDEV_DEVNODE",
136 "_KERNEL_SUBSYSTEM",
137 "OBJECT_EXE",
138 "OBJECT_SYSTEMD_CGROUP",
139 "OBJECT_COMM",
140 "OBJECT_SYSTEMD_UNIT",
141 "OBJECT_SYSTEMD_USER_UNIT",
142 "_SELINUX_CONTEXT",
143 "_NAMESPACE",
144 "OBJECT_SYSTEMD_SESSION",
145 "CONTAINER_ID",
146 "CONTAINER_NAME",
147 "CONTAINER_TAG",
148 "IMAGE_NAME",
149 "ND_NIDL_NODE",
150 "ND_NIDL_CONTEXT",
151 "ND_LOG_SOURCE",
152 "ND_ALERT_NAME",
153 "ND_ALERT_CLASS",
154 "ND_ALERT_COMPONENT",
155 "ND_ALERT_TYPE",
156 "ND_ALERT_STATUS",
157];
158
159#[derive(Debug, Clone)]
160pub struct NetdataFunctionConfig {
161 pub function_name: String,
162 pub source_selector_name: String,
163 pub source_selector_help: String,
164 pub default_facets: Vec<String>,
165 pub default_view_keys: Vec<String>,
166 pub default_histogram: Option<String>,
167 pub reader_options: ReaderOptions,
168 pub explorer_strategy: ExplorerStrategy,
169}
170
171impl NetdataFunctionConfig {
172 pub fn systemd_journal() -> Self {
173 Self {
174 function_name: DEFAULT_FUNCTION_NAME.to_string(),
175 source_selector_name: DEFAULT_SOURCE_SELECTOR_NAME.to_string(),
176 source_selector_help: DEFAULT_SOURCE_SELECTOR_HELP.to_string(),
177 default_facets: SYSTEMD_DEFAULT_FACETS
178 .iter()
179 .map(|field| (*field).to_string())
180 .collect(),
181 default_view_keys: SYSTEMD_DEFAULT_VIEW_KEYS
182 .iter()
183 .map(|field| (*field).to_string())
184 .collect(),
185 default_histogram: Some("PRIORITY".to_string()),
186 reader_options: ReaderOptions::snapshot(),
187 explorer_strategy: ExplorerStrategy::Traversal,
188 }
189 }
190}
191
192impl Default for NetdataFunctionConfig {
193 fn default() -> Self {
194 Self::systemd_journal()
195 }
196}
197
198#[derive(Debug, Default)]
199pub struct DisplayContext {
200 boot_first_realtime: BTreeMap<Vec<u8>, u64>,
201 uid_display_cache: RefCell<BTreeMap<String, String>>,
202 gid_display_cache: RefCell<BTreeMap<String, String>>,
203}
204
205#[derive(Debug, Clone, Copy)]
206pub enum DisplayScope {
207 Data,
208 Facet,
209 Histogram,
210}
211
212pub trait NetdataFunctionProfile {
213 fn field_display_value(
214 &self,
215 _context: &DisplayContext,
216 _scope: DisplayScope,
217 _field: &str,
218 value: &[u8],
219 ) -> Value {
220 Value::String(String::from_utf8_lossy(value).into_owned())
221 }
222
223 fn facet_option_name(&self, context: &DisplayContext, field: &str, raw_value: &[u8]) -> String {
224 match self.field_display_value(context, DisplayScope::Facet, field, raw_value) {
225 Value::String(value) => value,
226 other => other.to_string(),
227 }
228 }
229
230 fn row_options(&self, fields: &BTreeMap<String, Vec<Vec<u8>>>) -> Value {
231 if let Some(priority) = first_value(fields, "PRIORITY") {
232 return json!({ "severity": priority_to_row_severity(priority) });
233 }
234 json!({ "severity": "normal" })
235 }
236}
237
238#[derive(Debug, Clone, Copy, Default)]
239pub struct SystemdJournalProfile;
240
241#[derive(Debug, Clone, Copy, Default)]
242pub struct SystemdJournalPluginProfile;
243
244impl NetdataFunctionProfile for SystemdJournalProfile {
245 fn field_display_value(
246 &self,
247 context: &DisplayContext,
248 scope: DisplayScope,
249 field: &str,
250 value: &[u8],
251 ) -> Value {
252 systemd_field_display_value(context, scope, field, value, false)
253 }
254}
255
256impl NetdataFunctionProfile for SystemdJournalPluginProfile {
257 fn field_display_value(
258 &self,
259 context: &DisplayContext,
260 scope: DisplayScope,
261 field: &str,
262 value: &[u8],
263 ) -> Value {
264 systemd_field_display_value(context, scope, field, value, true)
265 }
266}
267
268#[derive(Debug, Clone)]
269pub struct NetdataJournalFunction<P = SystemdJournalProfile> {
270 config: NetdataFunctionConfig,
271 profile: P,
272}
273
274#[derive(Debug, Clone)]
275pub struct NetdataFunctionProgress {
276 pub current_file: usize,
277 pub total_files: usize,
278 pub matched_files: u64,
279 pub skipped_files: u64,
280 pub stats: ExplorerStats,
281 pub elapsed: Duration,
282}
283
284#[derive(Debug, Clone, Default, PartialEq, Eq)]
285pub struct NetdataJournalFileMetadata {
286 pub source_type: Option<u64>,
287 pub source_name: Option<String>,
288 pub file_last_modified_usec: Option<u64>,
289 pub msg_first_realtime_usec: Option<u64>,
290 pub msg_last_realtime_usec: Option<u64>,
291 pub journal_vs_realtime_delta_usec: Option<u64>,
292}
293
294pub trait NetdataFunctionState {
295 fn file_metadata(&self, _path: &Path) -> Option<NetdataJournalFileMetadata> {
296 None
297 }
298
299 fn update_file_journal_vs_realtime_delta_usec(&mut self, _path: &Path, _delta_usec: u64) {}
300}
301
302pub struct NetdataFunctionRunOptions<'a> {
303 pub timeout: Option<Duration>,
304 pub progress_callback: Option<&'a mut dyn FnMut(NetdataFunctionProgress)>,
305 pub cancellation_callback: Option<&'a dyn Fn() -> bool>,
306 pub state: Option<&'a mut dyn NetdataFunctionState>,
307 pub progress_interval: Duration,
308}
309
310impl NetdataFunctionRunOptions<'_> {
311 pub fn from_timeout_seconds(seconds: u64) -> Self {
312 let seconds = if seconds == 0 {
313 EFFECTIVELY_DISABLED_TIMEOUT_SECONDS
314 } else {
315 seconds
316 };
317 Self {
318 timeout: Some(Duration::from_secs(seconds)),
319 progress_callback: None,
320 cancellation_callback: None,
321 state: None,
322 progress_interval: Duration::from_millis(250),
323 }
324 }
325}
326
327impl Default for NetdataFunctionRunOptions<'_> {
328 fn default() -> Self {
329 Self {
330 timeout: Some(Duration::from_secs(EFFECTIVELY_DISABLED_TIMEOUT_SECONDS)),
331 progress_callback: None,
332 cancellation_callback: None,
333 state: None,
334 progress_interval: Duration::from_millis(250),
335 }
336 }
337}
338
339impl NetdataJournalFunction<SystemdJournalProfile> {
340 pub fn systemd_journal() -> Self {
341 Self {
342 config: NetdataFunctionConfig::systemd_journal(),
343 profile: SystemdJournalProfile,
344 }
345 }
346}
347
348impl NetdataJournalFunction<SystemdJournalPluginProfile> {
349 pub fn systemd_journal_plugin_compatible() -> Self {
350 Self {
351 config: NetdataFunctionConfig::systemd_journal(),
352 profile: SystemdJournalPluginProfile,
353 }
354 }
355}
356
357impl<P> NetdataJournalFunction<P>
358where
359 P: NetdataFunctionProfile,
360{
361 pub fn new(mut config: NetdataFunctionConfig, profile: P) -> Self {
362 if config.source_selector_name.is_empty() {
363 config.source_selector_name = DEFAULT_SOURCE_SELECTOR_NAME.to_string();
364 }
365 if config.source_selector_help.is_empty() {
366 config.source_selector_help = DEFAULT_SOURCE_SELECTOR_HELP.to_string();
367 }
368 Self { config, profile }
369 }
370
371 pub fn run_directory_request_json(&self, directory: &Path, request: &Value) -> Result<Value> {
372 self.run_directory_request_json_with_options(
373 directory,
374 request,
375 NetdataFunctionRunOptions::default(),
376 )
377 }
378
379 pub fn run_directory_request_json_with_options(
380 &self,
381 directory: &Path,
382 request: &Value,
383 mut options: NetdataFunctionRunOptions<'_>,
384 ) -> Result<Value> {
385 let request = NetdataRequest::parse(request, &self.config)?;
386 let collection = collect_journal_files(directory)?;
387 let paths = collection.files;
388 if request.info {
389 return Ok(self.info_response(request.echo, &paths, &options));
390 }
391 let annotation_paths = paths.clone();
392
393 let selected =
394 select_journal_files_for_request(paths, &request, self.config.reader_options, &options);
395 if let Some(response) = not_modified_before_scan_response(&request, &selected) {
396 return Ok(response);
397 }
398 let selected_files = selected.files;
399 let deadline = options.timeout.map(|timeout| Instant::now() + timeout);
400 let mut combined = self.explore_files(&selected_files, &request, deadline, &mut options)?;
401 self.finalize_combined_result(
402 &request,
403 &selected_files,
404 deadline,
405 &mut options,
406 &mut combined,
407 collection.skipped,
408 collection.errors,
409 )?;
410 Ok(self.query_response(request, &annotation_paths, combined))
411 }
412
413 fn finalize_combined_result(
414 &self,
415 request: &NetdataRequest,
416 selected_files: &[SelectedJournalFile],
417 deadline: Option<Instant>,
418 options: &mut NetdataFunctionRunOptions<'_>,
419 combined: &mut CombinedResult,
420 skipped_files: u64,
421 file_errors: Vec<String>,
422 ) -> Result<()> {
423 combined.skipped_files = combined.skipped_files.saturating_add(skipped_files);
424 combined.file_errors.extend(file_errors);
425 if combined.cancelled {
426 return Ok(());
427 }
428 if !request.data_only {
429 combined.add_zero_count_facet_values_from_files(
430 &request.facets,
431 self.config.reader_options,
432 );
433 combined.add_zero_count_selected_filter_values(request);
434 }
435 if should_collect_unfiltered_facet_vocabulary(request, combined) {
436 let vocabulary = self.explore_files(
437 selected_files,
438 &request.unfiltered_vocabulary(),
439 deadline,
440 options,
441 )?;
442 combined.add_zero_count_facet_values(&vocabulary.facets);
443 }
444 Ok(())
445 }
446
447 pub fn run_directory_request_bytes(&self, directory: &Path, request: &[u8]) -> Result<Value> {
448 self.run_directory_request_bytes_with_options(
449 directory,
450 request,
451 NetdataFunctionRunOptions::default(),
452 )
453 }
454
455 pub fn run_directory_request_bytes_with_options(
456 &self,
457 directory: &Path,
458 request: &[u8],
459 options: NetdataFunctionRunOptions<'_>,
460 ) -> Result<Value> {
461 let request: Value = serde_json::from_slice(request).map_err(|err| {
462 SdkError::InvalidPath(format!("invalid Netdata function JSON: {err}"))
463 })?;
464 self.run_directory_request_json_with_options(directory, &request, options)
465 }
466
467 fn explore_files(
468 &self,
469 files: &[SelectedJournalFile],
470 request: &NetdataRequest,
471 deadline: Option<Instant>,
472 options: &mut NetdataFunctionRunOptions<'_>,
473 ) -> Result<CombinedResult> {
474 let query = request.to_explorer_query(
475 files.len() as u64,
476 None,
477 NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
478 );
479 let mut combined = CombinedResult::default();
480 let page_window = RefCell::new(NetdataPageWindow::for_request(request));
481 combined.sampling_enabled = query.sampling.is_some();
482 let mut sampling_state =
483 ExplorerSamplingState::for_query(&query, histogram_bucket_count_for_query(&query));
484 let realtime_adjuster = RefCell::new(NetdataRealtimeAdjuster::new(request.direction));
485 let started = Instant::now();
486 let total_files = files.len();
487 for (file_index, file) in files.iter().enumerate() {
488 let path = &file.path;
489 if should_stop_before_file(&mut combined, deadline, options) {
490 break;
491 }
492 let Some(mut reader) = self.open_file_for_explore(
493 path,
494 &mut combined,
495 options,
496 progress_context(file_index, total_files, started),
497 )?
498 else {
499 continue;
500 };
501 combined.matched_files = combined.matched_files.saturating_add(1);
502 combined.matched_paths.push(path.clone());
503 let query = request.file_query(files.len(), reader.header(), &file.order);
504 collect_column_fields_for_file(&mut reader, request, path, &mut combined);
505 let explored = self.explore_single_file(
506 &mut reader,
507 request,
508 &query,
509 deadline,
510 options,
511 &combined,
512 &page_window,
513 sampling_state.as_mut(),
514 &realtime_adjuster,
515 progress_context(file_index, total_files, started),
516 file.order.journal_vs_realtime_delta_usec,
517 );
518 let Some((result, stop_reason)) = record_explore_result(explored, path, &mut combined)
519 else {
520 continue;
521 };
522 if finish_explored_file(
523 options,
524 request,
525 file,
526 &query,
527 result,
528 stop_reason,
529 &mut combined,
530 files,
531 file_index,
532 progress_context(file_index, total_files, started),
533 )? {
534 break;
535 }
536 }
537 combined.expand_row_payloads(self.config.reader_options);
538 combined.page_counters = Some(page_window.into_inner().counters());
539 Ok(combined)
540 }
541
542 fn open_file_for_explore(
543 &self,
544 path: &Path,
545 combined: &mut CombinedResult,
546 options: &mut NetdataFunctionRunOptions<'_>,
547 progress: ProgressContext,
548 ) -> Result<Option<FileReader>> {
549 match FileReader::open_with_options(path, self.config.reader_options) {
550 Ok(reader) => Ok(Some(reader)),
551 Err(err) => {
552 combined.skipped_files = combined.skipped_files.saturating_add(1);
553 combined
554 .file_errors
555 .push(format!("{}: {err}", path.display()));
556 emit_progress_for_combined(options, combined, progress);
557 Ok(None)
558 }
559 }
560 }
561
562 #[allow(clippy::too_many_arguments)]
563 fn explore_single_file(
564 &self,
565 reader: &mut FileReader,
566 request: &NetdataRequest,
567 query: &ExplorerQuery,
568 deadline: Option<Instant>,
569 options: &mut NetdataFunctionRunOptions<'_>,
570 combined: &CombinedResult,
571 page_window: &RefCell<NetdataPageWindow>,
572 sampling_state: Option<&mut ExplorerSamplingState>,
573 realtime_adjuster: &RefCell<NetdataRealtimeAdjuster>,
574 progress: ProgressContext,
575 realtime_delta_usec: u64,
576 ) -> Result<(ExplorerResult, Option<ExplorerStopReason>)> {
577 let cancellation_callback = options.cancellation_callback;
578 let progress_interval = options.progress_interval;
579 let mut explorer_progress = |explorer_progress: ExplorerProgress| {
580 emit_explorer_progress(options, combined, explorer_progress, progress);
581 };
582 let mut control = ExplorerControl::new();
583 control.set_deadline(deadline);
584 control.set_cancellation_callback(cancellation_callback);
585 control.set_progress_interval(progress_interval);
586 control.set_progress_callback(Some(&mut explorer_progress));
587 control.set_sampling_state(sampling_state);
588 let mut candidate_row =
589 |realtime_usec| page_window.borrow().candidate_to_keep(realtime_usec);
590 control.set_candidate_row_callback(Some(&mut candidate_row));
591 let mut adjust_realtime =
592 |realtime_usec| realtime_adjuster.borrow_mut().adjust(realtime_usec);
593 control.set_realtime_adjust_callback(Some(&mut adjust_realtime));
594 let mut matched_row = |realtime_usec, rows_matched| {
595 delta_scan_can_stop(
596 request,
597 page_window,
598 realtime_usec,
599 rows_matched,
600 realtime_delta_usec,
601 )
602 };
603 control.set_matched_row_callback(Some(&mut matched_row));
604 let result = reader.explore_with_strategy_cursor_rows_controlled(
605 query,
606 self.config.explorer_strategy,
607 &mut control,
608 )?;
609 Ok((result, control.stop_reason()))
610 }
611
612 fn info_response(
613 &self,
614 echo: Value,
615 paths: &[PathBuf],
616 options: &NetdataFunctionRunOptions<'_>,
617 ) -> Value {
618 json!({
619 "_request": echo,
620 "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
621 "v": 3,
622 "accepted_params": self.accepted_params_from_fields(&[]),
623 "required_params": self.required_source_params(paths, options),
624 "show_ids": true,
625 "has_history": true,
626 "pagination": {
627 "enabled": true,
628 "key": "anchor",
629 "column": "timestamp",
630 "units": "timestamp_usec",
631 },
632 "status": 200,
633 "type": "table",
634 "help": "Netdata-compatible journal log function backed by the systemd journal SDK"
635 })
636 }
637
638 fn query_response(
639 &self,
640 request: NetdataRequest,
641 annotation_paths: &[PathBuf],
642 combined: CombinedResult,
643 ) -> Value {
644 if combined.cancelled {
645 return netdata_function_error(499, "Request cancelled.");
646 }
647 let artifacts = self.query_response_artifacts(&request, annotation_paths, &combined);
648 let mut response = base_query_response(&request, &combined, &artifacts);
649 let Some(object) = response.as_object_mut() else {
650 return netdata_function_error(500, "Internal Netdata function response error.");
651 };
652 self.add_query_response_metadata(object, &request, &combined, artifacts);
653 response
654 }
655
656 fn query_response_artifacts(
657 &self,
658 request: &NetdataRequest,
659 annotation_paths: &[PathBuf],
660 combined: &CombinedResult,
661 ) -> QueryResponseArtifacts {
662 let reportable_facet_fields = combined.reportable_facet_fields_bytes(&request.facets);
663 let reportable_facet_field_names = string_fields(&reportable_facet_fields);
664 let columns = self.build_columns(
665 &request,
666 &reportable_facet_fields,
667 &combined.rows,
668 &combined.facets,
669 &combined.column_fields,
670 );
671 let boot_ids = response_boot_ids(
672 &columns.order,
673 &combined.rows,
674 &combined.facets,
675 combined.histogram.as_ref(),
676 );
677 let context = DisplayContext {
678 boot_first_realtime: collect_boot_first_realtime(
679 annotation_paths,
680 self.config.reader_options,
681 &boot_ids,
682 ),
683 ..DisplayContext::default()
684 };
685 let data =
686 self.build_data_rows(&context, &columns.order, &combined.rows, request.direction);
687 let facets = self.build_facets(&context, &reportable_facet_fields, &combined.facets);
688 let histogram = self.query_histogram_artifact(request, combined, &context);
689 let message = query_message(combined.timed_out, &combined.stats);
690 let items = response_items(request, combined, data.len() as u64);
691 QueryResponseArtifacts {
692 reportable_facet_field_names,
693 columns,
694 data,
695 facets,
696 histogram,
697 message,
698 items,
699 }
700 }
701
702 fn add_query_response_metadata(
703 &self,
704 object: &mut Map<String, Value>,
705 request: &NetdataRequest,
706 combined: &CombinedResult,
707 artifacts: QueryResponseArtifacts,
708 ) {
709 if !request.data_only {
710 self.add_full_query_response_metadata(object, request, combined, &artifacts);
711 } else if request.histogram.is_some() {
712 object.insert(
713 "available_histograms".to_string(),
714 self.available_histograms(request, combined),
715 );
716 }
717 add_last_modified_if_needed(object, request, combined);
718 add_sampling_if_needed(object, combined);
719 add_analysis_outputs_if_needed(object, request, artifacts);
720 }
721
722 fn query_histogram_artifact(
723 &self,
724 request: &NetdataRequest,
725 combined: &CombinedResult,
726 context: &DisplayContext,
727 ) -> Option<Value> {
728 if let Some(histogram) = combined.histogram.as_ref() {
729 return Some(self.build_histogram(
730 context,
731 histogram,
732 combined.facets.get(&histogram.field),
733 ));
734 }
735 let histogram_field = request.histogram.as_ref()?;
736 if request.data_only && !request.delta {
737 return None;
738 }
739 let query = request.to_explorer_query(
740 combined.matched_files,
741 None,
742 NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
743 );
744 let histogram = empty_histogram_for_query(histogram_field.as_bytes(), &query);
745 Some(self.build_histogram(
746 context,
747 &histogram,
748 combined.facets.get(histogram.field.as_slice()),
749 ))
750 }
751
752 fn add_full_query_response_metadata(
753 &self,
754 object: &mut Map<String, Value>,
755 request: &NetdataRequest,
756 combined: &CombinedResult,
757 artifacts: &QueryResponseArtifacts,
758 ) {
759 object.insert("message".to_string(), artifacts.message.clone());
760 object.insert("update_every".to_string(), Value::from(1));
761 object.insert("help".to_string(), Value::Null);
762 object.insert(
763 "accepted_params".to_string(),
764 self.accepted_params_from_fields(&artifacts.reportable_facet_field_names),
765 );
766 object.insert("default_sort_column".to_string(), Value::from("timestamp"));
767 object.insert("default_charts".to_string(), Value::Array(Vec::new()));
768 object.insert(
769 "available_histograms".to_string(),
770 self.available_histograms(request, combined),
771 );
772 }
773
774 fn build_columns(
775 &self,
776 request: &NetdataRequest,
777 reportable_facet_fields: &[Vec<u8>],
778 rows: &[LocatedRow],
779 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
780 column_fields: &BTreeSet<String>,
781 ) -> Columns {
782 let mut order = vec!["timestamp".to_string(), "rowOptions".to_string()];
783 push_unique_many(&mut order, &self.config.default_view_keys);
784 push_unique_many(&mut order, &string_fields(reportable_facet_fields));
785 if let Some(histogram) = &request.histogram {
786 push_unique(&mut order, histogram);
787 }
788 for field in column_fields {
789 push_unique(&mut order, field);
790 }
791
792 for (field, values) in facets {
793 if !facet_group_is_reportable(values) {
794 continue;
795 }
796 push_unique(&mut order, &String::from_utf8_lossy(field));
797 }
798 for row in rows {
799 let fields = row_fields(row);
800 for field in fields.keys() {
801 push_unique(&mut order, field);
802 }
803 }
804
805 let mut map = Map::new();
806 for (index, key) in order.iter().enumerate() {
807 map.insert(key.clone(), column_metadata(key, index));
808 }
809 Columns { order, map }
810 }
811
812 fn build_data_rows(
813 &self,
814 context: &DisplayContext,
815 column_order: &[String],
816 rows: &[LocatedRow],
817 direction: Direction,
818 ) -> Vec<Value> {
819 let row_iter: Box<dyn Iterator<Item = &LocatedRow> + '_> = match direction {
820 Direction::Forward => Box::new(rows.iter().rev()),
821 Direction::Backward => Box::new(rows.iter()),
822 };
823 row_iter
824 .map(|located| {
825 let fields = row_fields(located);
826 let mut row = Vec::with_capacity(column_order.len());
827 for column in column_order {
828 let value = match column.as_str() {
829 "timestamp" => Value::from(located.row.realtime_usec),
830 "rowOptions" => self.profile.row_options(&fields),
831 field => first_value(&fields, field)
832 .map(|value| {
833 self.profile.field_display_value(
834 context,
835 DisplayScope::Data,
836 field,
837 value,
838 )
839 })
840 .unwrap_or(Value::Null),
841 };
842 row.push(value);
843 }
844 Value::Array(row)
845 })
846 .collect()
847 }
848
849 fn build_facets(
850 &self,
851 context: &DisplayContext,
852 requested: &[Vec<u8>],
853 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
854 ) -> Value {
855 let mut out = Vec::new();
856 for (order, field) in requested.iter().enumerate() {
857 let values = facets.get(field);
858 let field_name = String::from_utf8_lossy(field).into_owned();
859 let mut options: Vec<_> = values
860 .into_iter()
861 .flat_map(|values| values.iter())
862 .filter(|(value, count)| {
863 (!value.is_empty() && value.as_slice() != b"-")
864 || (**count == 0 && value.is_empty())
865 })
866 .map(|(value, count)| {
867 if *count == 0 && value.is_empty() {
868 return json!({
869 "id": NETDATA_EMPTY_STRING_FACET_HASH_ID,
870 "name": NETDATA_UNAVAILABLE_FIELD_LABEL,
871 "count": count,
872 });
873 }
874 json!({
875 "id": String::from_utf8_lossy(value).into_owned(),
876 "name": self.profile.facet_option_name(context, &field_name, value),
877 "count": count,
878 })
879 })
880 .collect();
881 sort_facet_options(&field_name, &mut options);
882 for (idx, option) in options.iter_mut().enumerate() {
883 if let Some(object) = option.as_object_mut() {
884 object.insert("order".to_string(), Value::from((idx + 1) as u64));
885 }
886 }
887 out.push(json!({
888 "id": field_name,
889 "name": String::from_utf8_lossy(field).into_owned(),
890 "order": order + 1,
891 "options": options,
892 }));
893 }
894 Value::Array(out)
895 }
896
897 fn build_histogram(
898 &self,
899 context: &DisplayContext,
900 histogram: &ExplorerHistogram,
901 known_values: Option<&BTreeMap<Vec<u8>, u64>>,
902 ) -> Value {
903 let field = String::from_utf8_lossy(&histogram.field).into_owned();
904 let mut dimension_ids = BTreeSet::new();
905 let mut buckets = Vec::with_capacity(histogram.buckets.len());
906 for bucket in &histogram.buckets {
907 let mut values = BTreeMap::new();
908 for (value, count) in &bucket.values {
909 add_netdata_facet_count(&mut values, value, *count);
910 }
911 for value in values.keys() {
912 dimension_ids.insert(value.clone());
913 }
914 buckets.push((bucket.start_realtime_usec, values));
915 }
916 let actual_dimension_ids = dimension_ids.clone();
917 if let Some(known_values) = known_values {
918 for value in known_values.keys() {
919 if value.is_empty() || value.as_slice() == b"-" {
920 continue;
921 }
922 dimension_ids.insert(value.clone());
923 }
924 }
925 let dimension_ids: Vec<Vec<u8>> = dimension_ids.into_iter().collect();
926 let metadata = self.histogram_chart_metadata(
927 context,
928 &field,
929 &buckets,
930 &actual_dimension_ids,
931 &dimension_ids,
932 );
933 let data: Vec<Value> = buckets
934 .iter()
935 .map(|(start_realtime_usec, values)| {
936 let mut point = Vec::with_capacity(dimension_ids.len() + 1);
937 point.push(Value::from(start_realtime_usec / 1000));
938 for value in &dimension_ids {
939 let count = values
940 .get(value)
941 .copied()
942 .map(Value::from)
943 .unwrap_or_else(|| {
944 if actual_dimension_ids.contains(value) {
945 Value::from(0)
946 } else {
947 Value::Null
948 }
949 });
950 point.push(Value::Array(vec![count, Value::from(0), Value::from(0)]));
951 }
952 Value::Array(point)
953 })
954 .collect();
955
956 json!({
957 "id": field,
958 "name": field,
959 "chart": {
960 "summary": metadata.summary,
961 "totals": metadata.totals,
962 "result": {
963 "labels": metadata.result_labels,
964 "point": { "value": 0, "arp": 1, "pa": 2 },
965 "data": data,
966 },
967 "db": {
968 "tiers": 1,
969 "update_every": histogram_update_every_seconds(histogram),
970 "units": "events",
971 "dimensions": {
972 "ids": metadata.ids.clone(),
973 "names": metadata.names.clone(),
974 "units": metadata.units.clone(),
975 "sts": metadata.stats.clone(),
976 },
977 "per_tier": [{
978 "tier": 0,
979 "queries": 1,
980 "points": metadata.points,
981 "update_every": histogram_update_every_seconds(histogram),
982 }],
983 },
984 "view": {
985 "title": format!("Events Distribution by {}", String::from_utf8_lossy(&histogram.field)),
986 "update_every": histogram_update_every_seconds(histogram),
987 "after": histogram_after_seconds(histogram),
988 "before": histogram_before_seconds(histogram),
989 "units": "events",
990 "chart_type": "stackedBar",
991 "dimensions": {
992 "grouped_by": ["dimension"],
993 "ids": metadata.ids,
994 "names": metadata.names,
995 "colors": metadata.colors,
996 "units": metadata.units,
997 "sts": metadata.stats,
998 },
999 "min": metadata.min,
1000 "max": metadata.max,
1001 },
1002 "agents": [{
1003 "mg": "default",
1004 "nm": "facets.histogram",
1005 "now": now_unix_seconds(),
1006 "ai": 0,
1007 }],
1008 }
1009 })
1010 }
1011
1012 fn histogram_chart_metadata(
1013 &self,
1014 context: &DisplayContext,
1015 field: &str,
1016 buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1017 actual_dimensions: &BTreeSet<Vec<u8>>,
1018 dimensions: &[Vec<u8>],
1019 ) -> HistogramChartMetadata {
1020 let mut ids = Vec::with_capacity(dimensions.len());
1021 let mut names = Vec::with_capacity(dimensions.len());
1022 let mut colors = Vec::with_capacity(dimensions.len());
1023 let mut units = Vec::with_capacity(dimensions.len());
1024 let mut min_values = Vec::with_capacity(dimensions.len());
1025 let mut max_values = Vec::with_capacity(dimensions.len());
1026 let mut avg_values = Vec::with_capacity(dimensions.len());
1027 let mut arp_values = Vec::with_capacity(dimensions.len());
1028 let mut con_values = Vec::with_capacity(dimensions.len());
1029 let mut summary_dimensions = Vec::with_capacity(dimensions.len());
1030 let mut result_labels = vec![Value::String("time".to_string())];
1031
1032 let total: u64 = dimensions
1033 .iter()
1034 .map(|dimension| {
1035 buckets
1036 .iter()
1037 .map(|(_, values)| values.get(dimension).copied().unwrap_or(0))
1038 .sum::<u64>()
1039 })
1040 .sum();
1041
1042 let mut min = 0;
1043 let mut max = 0;
1044 let mut points = 0;
1045 for (priority, dimension) in dimensions.iter().enumerate() {
1046 let id = String::from_utf8_lossy(dimension).into_owned();
1047 let display = match self.profile.field_display_value(
1048 context,
1049 DisplayScope::Histogram,
1050 field,
1051 dimension,
1052 ) {
1053 Value::String(value) => value,
1054 other => other.to_string(),
1055 };
1056 let (dimension_min, dimension_max, dimension_sum, actual) =
1057 histogram_dimension_stats(buckets, actual_dimensions, dimension);
1058 let dimension_average = if actual && !buckets.is_empty() {
1059 dimension_sum as f64 / buckets.len() as f64
1060 } else {
1061 0.0
1062 };
1063 if actual {
1064 if points == 0 || dimension_min < min {
1065 min = dimension_min;
1066 }
1067 if dimension_max > max {
1068 max = dimension_max;
1069 }
1070 points += buckets.len() as u64;
1071 }
1072 let contribution = if total > 0 {
1073 dimension_sum as f64 * 100.0 / total as f64
1074 } else {
1075 0.0
1076 };
1077
1078 ids.push(Value::String(id.clone()));
1079 names.push(Value::String(display.clone()));
1080 colors.push(Value::Null);
1081 units.push(Value::String("events".to_string()));
1082 result_labels.push(Value::String(display.clone()));
1083 min_values.push(Value::from(dimension_min));
1084 max_values.push(Value::from(dimension_max));
1085 avg_values.push(Value::from(dimension_average));
1086 arp_values.push(Value::from(0));
1087 con_values.push(Value::from(contribution));
1088 summary_dimensions.push(json!({
1089 "id": id,
1090 "nm": display,
1091 "ds": { "sl": bool_to_u64(actual), "qr": bool_to_u64(actual) },
1092 "sts": {
1093 "min": dimension_min,
1094 "max": dimension_max,
1095 "avg": dimension_average,
1096 "con": contribution,
1097 },
1098 "pri": priority as u64,
1099 }));
1100 }
1101
1102 let stats = json!({
1103 "min": min_values,
1104 "max": max_values,
1105 "avg": avg_values,
1106 "arp": arp_values,
1107 "con": con_values,
1108 });
1109 let summary_stats = json!({
1110 "min": min,
1111 "max": max,
1112 "avg": histogram_average(total, points),
1113 "con": 100.0,
1114 });
1115 let dimensions_len = dimensions.len() as u64;
1116
1117 HistogramChartMetadata {
1118 ids,
1119 names,
1120 colors,
1121 units,
1122 stats,
1123 summary: json!({
1124 "nodes": [histogram_summary_node(dimensions_len, points, &summary_stats)],
1125 "contexts": [histogram_summary_context(dimensions_len, points, &summary_stats)],
1126 "instances": [histogram_summary_instance(dimensions_len, points, &summary_stats)],
1127 "dimensions": summary_dimensions,
1128 "labels": [],
1129 "alerts": [],
1130 }),
1131 totals: histogram_totals(dimensions_len),
1132 result_labels,
1133 min,
1134 max,
1135 points,
1136 }
1137 }
1138
1139 fn accepted_params_from_fields(&self, fields: &[String]) -> Value {
1140 NETDATA_ACCEPTED_PARAMS
1141 .iter()
1142 .copied()
1143 .chain(fields.iter().map(String::as_str))
1144 .map(|field| Value::String(field.to_string()))
1145 .collect()
1146 }
1147
1148 fn required_source_params(
1149 &self,
1150 paths: &[PathBuf],
1151 options: &NetdataFunctionRunOptions<'_>,
1152 ) -> Value {
1153 let mut all = JournalSourceSummary::default();
1154 let mut local = JournalSourceSummary::default();
1155 let mut local_namespaces = JournalSourceSummary::default();
1156 let mut local_system = JournalSourceSummary::default();
1157 let mut local_user = JournalSourceSummary::default();
1158 let mut remote = JournalSourceSummary::default();
1159 let mut other = JournalSourceSummary::default();
1160 let mut exact = BTreeMap::<String, JournalSourceSummary>::new();
1161
1162 for path in paths {
1163 let metadata = file_metadata(options, path);
1164 let source_type = metadata
1165 .as_ref()
1166 .and_then(|metadata| metadata.source_type)
1167 .unwrap_or_else(|| journal_file_source_type(path));
1168 all.add_path(path, self.config.reader_options, metadata.as_ref());
1169 if source_type & SOURCE_TYPE_LOCAL_ALL != 0 {
1170 local.add_path(path, self.config.reader_options, metadata.as_ref());
1171 }
1172 if source_type & SOURCE_TYPE_LOCAL_NAMESPACE != 0 {
1173 local_namespaces.add_path(path, self.config.reader_options, metadata.as_ref());
1174 }
1175 if source_type & SOURCE_TYPE_LOCAL_SYSTEM != 0 {
1176 local_system.add_path(path, self.config.reader_options, metadata.as_ref());
1177 }
1178 if source_type & SOURCE_TYPE_LOCAL_USER != 0 {
1179 local_user.add_path(path, self.config.reader_options, metadata.as_ref());
1180 }
1181 if source_type & SOURCE_TYPE_REMOTE_ALL != 0 {
1182 remote.add_path(path, self.config.reader_options, metadata.as_ref());
1183 }
1184 if source_type & SOURCE_TYPE_LOCAL_OTHER != 0 {
1185 other.add_path(path, self.config.reader_options, metadata.as_ref());
1186 }
1187 let source_name = metadata
1188 .as_ref()
1189 .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1190 .or_else(|| journal_file_exact_source_name(path));
1191 if let Some(source_name) = source_name {
1192 exact.entry(source_name).or_default().add_path(
1193 path,
1194 self.config.reader_options,
1195 metadata.as_ref(),
1196 );
1197 }
1198 }
1199
1200 let mut source_options = Vec::new();
1201 push_source_option(&mut source_options, "all", &all);
1202 push_source_option(&mut source_options, "all-local-logs", &local);
1203 push_source_option(
1204 &mut source_options,
1205 "all-local-namespaces",
1206 &local_namespaces,
1207 );
1208 push_source_option(&mut source_options, "all-local-system-logs", &local_system);
1209 push_source_option(&mut source_options, "all-local-user-logs", &local_user);
1210 push_source_option(&mut source_options, "all-remote-systems", &remote);
1211 push_source_option(&mut source_options, "all-uncategorized", &other);
1212 for (name, summary) in exact {
1213 push_source_option(&mut source_options, &name, &summary);
1214 }
1215
1216 json!([{
1217 "id": "__logs_sources",
1218 "name": self.config.source_selector_name.as_str(),
1219 "help": self.config.source_selector_help.as_str(),
1220 "type": "multiselect",
1221 "options": source_options,
1222 }])
1223 }
1224
1225 fn available_histograms(&self, request: &NetdataRequest, combined: &CombinedResult) -> Value {
1226 let mut fields = combined.reportable_facet_fields(&request.facets);
1227 if request.data_only {
1228 if let Some(histogram) = &request.histogram {
1229 push_unique(&mut fields, histogram);
1230 }
1231 }
1232 let mut sorted = fields.clone();
1233 sorted.sort_by(|left, right| netdata_reorder_key(left).cmp(&netdata_reorder_key(right)));
1234 let order_by_field: BTreeMap<String, usize> = sorted
1235 .into_iter()
1236 .enumerate()
1237 .map(|(index, field)| (field, index + 1))
1238 .collect();
1239
1240 fields
1241 .into_iter()
1242 .map(|field| {
1243 let order = order_by_field.get(&field).copied().unwrap_or(0);
1244 json!({
1245 "id": field,
1246 "name": field,
1247 "order": order,
1248 })
1249 })
1250 .collect()
1251 }
1252}
1253
1254struct HistogramChartMetadata {
1255 ids: Vec<Value>,
1256 names: Vec<Value>,
1257 colors: Vec<Value>,
1258 units: Vec<Value>,
1259 stats: Value,
1260 summary: Value,
1261 totals: Value,
1262 result_labels: Vec<Value>,
1263 min: u64,
1264 max: u64,
1265 points: u64,
1266}
1267
1268fn histogram_dimension_stats(
1269 buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1270 actual_dimensions: &BTreeSet<Vec<u8>>,
1271 dimension: &[u8],
1272) -> (u64, u64, u64, bool) {
1273 if !actual_dimensions.contains(dimension) {
1274 return (0, 0, 0, false);
1275 }
1276 let mut min = 0;
1277 let mut max = 0;
1278 let mut sum = 0;
1279 for (idx, (_, values)) in buckets.iter().enumerate() {
1280 let count = values.get(dimension).copied().unwrap_or(0);
1281 if idx == 0 || count < min {
1282 min = count;
1283 }
1284 if count > max {
1285 max = count;
1286 }
1287 sum += count;
1288 }
1289 (min, max, sum, true)
1290}
1291
1292fn histogram_average(total: u64, points: u64) -> f64 {
1293 if points == 0 {
1294 0.0
1295 } else {
1296 total as f64 / points as f64
1297 }
1298}
1299
1300fn histogram_summary_node(dimensions: u64, points: u64, stats: &Value) -> Value {
1301 let mut node = json!({
1302 "mg": "default",
1303 "nm": "facets.histogram",
1304 "ni": 0,
1305 "st": { "ai": 0, "code": 200, "msg": "" },
1306 });
1307 add_histogram_summary_shape(&mut node, dimensions, points, stats, true);
1308 node
1309}
1310
1311fn histogram_summary_context(dimensions: u64, points: u64, stats: &Value) -> Value {
1312 let mut context = json!({ "id": "facets.histogram" });
1313 add_histogram_summary_shape(&mut context, dimensions, points, stats, true);
1314 context
1315}
1316
1317fn histogram_summary_instance(dimensions: u64, points: u64, stats: &Value) -> Value {
1318 let mut instance = json!({ "id": "facets.histogram", "ni": 0 });
1319 add_histogram_summary_shape(&mut instance, dimensions, points, stats, false);
1320 instance
1321}
1322
1323fn add_histogram_summary_shape(
1324 object: &mut Value,
1325 dimensions: u64,
1326 points: u64,
1327 stats: &Value,
1328 include_instances: bool,
1329) {
1330 let Some(map) = object.as_object_mut() else {
1331 return;
1332 };
1333 if dimensions > 0 {
1334 if include_instances {
1335 map.insert("is".to_string(), json!({ "sl": 1, "qr": 1 }));
1336 }
1337 map.insert(
1338 "ds".to_string(),
1339 json!({ "sl": dimensions, "qr": dimensions }),
1340 );
1341 }
1342 if points > 0 {
1343 map.insert("sts".to_string(), stats.clone());
1344 }
1345}
1346
1347fn histogram_totals(dimensions: u64) -> Value {
1348 let mut totals = json!({ "nodes": { "sl": 1, "qr": 1 } });
1349 if dimensions > 0 {
1350 if let Some(map) = totals.as_object_mut() {
1351 map.insert("contexts".to_string(), json!({ "sl": 1, "qr": 1 }));
1352 map.insert("instances".to_string(), json!({ "sl": 1, "qr": 1 }));
1353 map.insert(
1354 "dimensions".to_string(),
1355 json!({ "sl": dimensions, "qr": dimensions }),
1356 );
1357 }
1358 }
1359 totals
1360}
1361
1362fn bool_to_u64(value: bool) -> u64 {
1363 if value { 1 } else { 0 }
1364}
1365
1366fn histogram_after_seconds(histogram: &ExplorerHistogram) -> u64 {
1367 histogram
1368 .buckets
1369 .first()
1370 .map(|bucket| bucket.start_realtime_usec / 1_000_000)
1371 .unwrap_or(0)
1372}
1373
1374fn histogram_before_seconds(histogram: &ExplorerHistogram) -> u64 {
1375 histogram
1376 .buckets
1377 .last()
1378 .map(|bucket| bucket.end_realtime_usec / 1_000_000)
1379 .unwrap_or(0)
1380}
1381
1382fn now_unix_seconds() -> u64 {
1383 SystemTime::now()
1384 .duration_since(UNIX_EPOCH)
1385 .unwrap_or_default()
1386 .as_secs()
1387}
1388
1389#[derive(Debug, Clone)]
1390struct NetdataRequest {
1391 info: bool,
1392 echo: Value,
1393 after_realtime_usec: Option<u64>,
1394 before_realtime_usec: Option<u64>,
1395 if_modified_since_usec: u64,
1396 anchor: ExplorerAnchor,
1397 direction: Direction,
1398 limit: usize,
1399 data_only: bool,
1400 delta: bool,
1401 tail: bool,
1402 sampling: u64,
1403 source_type: u64,
1404 exact_sources: Vec<String>,
1405 filters: Vec<ExplorerFilter>,
1406 facets: Vec<Vec<u8>>,
1407 histogram: Option<String>,
1408 fts_terms: Vec<ExplorerFtsPattern>,
1409 fts_patterns: Vec<Vec<u8>>,
1410 fts_negative_patterns: Vec<Vec<u8>>,
1411}
1412
1413impl NetdataRequest {
1414 fn parse(value: &Value, config: &NetdataFunctionConfig) -> Result<Self> {
1415 let object = value.as_object().ok_or_else(|| {
1416 SdkError::InvalidPath("Netdata function request must be a JSON object".to_string())
1417 })?;
1418 let now_seconds = unix_now_seconds();
1419 let info = get_bool(object, "info").unwrap_or(false);
1420 let after = get_i64(object, "after");
1421 let before = get_i64(object, "before");
1422 let (after_realtime_usec, before_realtime_usec) =
1423 normalize_time_window(now_seconds, after, before);
1424 let direction = request_direction(object);
1425 let if_modified_since_usec = get_u64(object, "if_modified_since").unwrap_or_default();
1426 let data_only = get_bool(object, "data_only").unwrap_or(false);
1427 let delta = request_delta(data_only, object);
1428 let tail = request_tail(data_only, if_modified_since_usec, object);
1429 let sampling = get_u64(object, "sampling").unwrap_or(DEFAULT_ITEMS_SAMPLING);
1430 let (anchor, direction) = request_anchor_and_direction(
1431 object,
1432 tail,
1433 direction,
1434 after_realtime_usec,
1435 before_realtime_usec,
1436 );
1437 let requested_limit = request_limit(object);
1438 let limit = requested_limit.max(2);
1439 let requested_facets = parse_string_array(object.get("facets"));
1440 let facets = request_facets(&requested_facets, config);
1441 let requested_histogram = request_histogram(object);
1442 let histogram = request_histogram_or_default(&requested_histogram, config);
1443 let requested_query = request_query(object);
1444 let (fts_terms, fts_patterns, fts_negative_patterns) = requested_query
1445 .as_deref()
1446 .map(parse_fts_query_patterns)
1447 .unwrap_or_default();
1448 let source_selection = parse_source_selection(object.get("selections"));
1449 let filters = parse_filters(object.get("selections"));
1450
1451 let echo_input = RequestEchoInput {
1452 info,
1453 after_realtime_usec,
1454 before_realtime_usec,
1455 if_modified_since_usec,
1456 anchor,
1457 direction,
1458 limit: requested_limit,
1459 data_only,
1460 delta,
1461 tail,
1462 sampling,
1463 source_type: source_selection.source_type,
1464 requested_facets: requested_facets.as_deref(),
1465 selections: object.get("selections"),
1466 histogram: requested_histogram.as_deref(),
1467 query: requested_query.as_deref(),
1468 };
1469 let echo = normalized_request_echo(&echo_input);
1470
1471 Ok(Self {
1472 info,
1473 echo,
1474 after_realtime_usec,
1475 before_realtime_usec,
1476 if_modified_since_usec,
1477 anchor,
1478 direction,
1479 limit,
1480 data_only,
1481 delta,
1482 tail,
1483 sampling,
1484 source_type: source_selection.source_type,
1485 exact_sources: source_selection.exact_sources,
1486 filters,
1487 facets,
1488 histogram,
1489 fts_terms,
1490 fts_patterns,
1491 fts_negative_patterns,
1492 })
1493 }
1494
1495 fn matches_source(&self, path: &Path, metadata: Option<&NetdataJournalFileMetadata>) -> bool {
1496 if self.source_type == SOURCE_TYPE_ALL && self.exact_sources.is_empty() {
1497 return true;
1498 }
1499 if self.source_type & SOURCE_TYPE_ALL != 0 {
1500 return true;
1501 }
1502 let file_source_type = metadata
1503 .and_then(|metadata| metadata.source_type)
1504 .unwrap_or_else(|| journal_file_source_type(path));
1505 if file_source_type & self.source_type != 0 {
1506 return true;
1507 }
1508 if self.exact_sources.is_empty() {
1509 return false;
1510 }
1511 let source_name = metadata
1512 .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1513 .or_else(|| journal_file_exact_source_name(path));
1514 self.exact_sources
1515 .iter()
1516 .any(|source| source_name.as_deref() == Some(source.as_str()))
1517 }
1518
1519 fn to_explorer_query(
1520 &self,
1521 matched_files: u64,
1522 file_header: Option<FileHeader>,
1523 realtime_slack_usec: u64,
1524 ) -> ExplorerQuery {
1525 let analysis_enabled = !self.data_only || self.delta;
1526 let tail_anchor = self.tail && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1527 let backward_page_anchor = self.data_only
1528 && !tail_anchor
1529 && self.direction == Direction::Backward
1530 && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1531 let after_realtime_usec = if tail_anchor {
1532 tail_after_realtime_bound(self.after_realtime_usec, self.anchor)
1533 } else {
1534 self.after_realtime_usec
1535 };
1536 let before_realtime_usec = if backward_page_anchor {
1537 before_realtime_bound_excluding_anchor(self.before_realtime_usec, self.anchor)
1538 } else {
1539 self.before_realtime_usec
1540 };
1541 let anchor = if tail_anchor || backward_page_anchor {
1542 ExplorerAnchor::Auto
1543 } else {
1544 self.anchor
1545 };
1546 let sampling = (analysis_enabled
1547 && self.sampling != 0
1548 && matched_files != 0
1549 && after_realtime_usec.is_some()
1550 && before_realtime_usec.is_some())
1551 .then(|| {
1552 let header = file_header.unwrap_or(FileHeader {
1553 signature: [0; 8],
1554 compatible_flags: 0,
1555 incompatible_flags: 0,
1556 state: 0,
1557 header_size: 0,
1558 n_entries: 0,
1559 head_entry_realtime: 0,
1560 tail_entry_realtime: 0,
1561 head_entry_seqnum: 0,
1562 tail_entry_seqnum: 0,
1563 tail_entry_boot_id: [0; 16],
1564 seqnum_id: [0; 16],
1565 });
1566 let messages_in_file = header
1567 .tail_entry_seqnum
1568 .checked_sub(header.head_entry_seqnum)
1569 .and_then(|span| span.checked_add(1))
1570 .filter(|_| header.head_entry_seqnum != 0 && header.tail_entry_seqnum != 0)
1571 .unwrap_or(header.n_entries);
1572 ExplorerSampling {
1573 budget: self.sampling,
1574 matched_files,
1575 file_head_realtime_usec: header.head_entry_realtime,
1576 file_tail_realtime_usec: header.tail_entry_realtime,
1577 file_head_seqnum: header.head_entry_seqnum,
1578 file_tail_seqnum: header.tail_entry_seqnum,
1579 file_entries: messages_in_file,
1580 }
1581 });
1582 ExplorerQuery {
1583 after_realtime_usec,
1584 before_realtime_usec,
1585 anchor,
1586 direction: self.direction,
1587 limit: self.limit,
1588 filters: self.filters.clone(),
1589 facets: analysis_enabled
1590 .then(|| self.facets.clone())
1591 .unwrap_or_default(),
1592 histogram: analysis_enabled
1593 .then(|| {
1594 self.histogram
1595 .as_ref()
1596 .map(|field| field.as_bytes().to_vec())
1597 })
1598 .flatten(),
1599 histogram_after_realtime_usec: self.after_realtime_usec,
1600 histogram_before_realtime_usec: self.before_realtime_usec,
1601 histogram_target_buckets: DEFAULT_HISTOGRAM_BUCKETS,
1602 fts_terms: self.fts_terms.clone(),
1603 fts_patterns: self.fts_patterns.clone(),
1604 fts_negative_patterns: self.fts_negative_patterns.clone(),
1605 field_mode: ExplorerFieldMode::FirstValue,
1606 exclude_facet_field_filters: self.distinct_filter_fields() > 1,
1607 use_source_realtime: true,
1608 realtime_slack_usec: normalize_journal_vs_realtime_delta_usec(realtime_slack_usec),
1609 stop_when_rows_full: self.data_only && !tail_anchor,
1610 stop_when_rows_full_check_every: DATA_ONLY_CHECK_EVERY_ROWS,
1611 sampling,
1612 debug_collect_column_fields_by_row_traversal: false,
1613 }
1614 }
1615
1616 fn file_query(
1617 &self,
1618 matched_files: usize,
1619 file_header: FileHeader,
1620 order: &JournalFileOrderInfo,
1621 ) -> ExplorerQuery {
1622 let mut query = self.to_explorer_query(
1623 matched_files as u64,
1624 Some(file_header),
1625 order.journal_vs_realtime_delta_usec,
1626 );
1627 if self.data_only && self.delta {
1628 query.stop_when_rows_full = false;
1629 }
1630 query
1631 }
1632
1633 fn unfiltered_vocabulary(&self) -> Self {
1634 let mut request = self.clone();
1635 request.filters.clear();
1636 request.histogram = None;
1637 request.limit = 0;
1638 request.fts_terms.clear();
1639 request.fts_patterns.clear();
1640 request.fts_negative_patterns.clear();
1641 request
1642 }
1643
1644 fn distinct_filter_fields(&self) -> usize {
1645 self.filters
1646 .iter()
1647 .map(|filter| filter.field.as_slice())
1648 .collect::<HashSet<_>>()
1649 .len()
1650 }
1651}
1652
1653#[derive(Debug, Clone)]
1654struct LocatedRow {
1655 file_path: PathBuf,
1656 row: ExplorerRow,
1657}
1658
1659#[derive(Debug)]
1660struct NetdataRealtimeAdjuster {
1661 direction: Direction,
1662 last_realtime_from: u64,
1663 last_realtime_to: u64,
1664}
1665
1666impl NetdataRealtimeAdjuster {
1667 fn new(direction: Direction) -> Self {
1668 Self {
1669 direction,
1670 last_realtime_from: 0,
1671 last_realtime_to: 0,
1672 }
1673 }
1674
1675 fn adjust(&mut self, realtime_usec: u64) -> u64 {
1676 match self.direction {
1677 Direction::Backward => {
1678 if realtime_usec >= self.last_realtime_from
1679 && realtime_usec <= self.last_realtime_to
1680 {
1681 self.last_realtime_from = self.last_realtime_from.saturating_sub(1);
1682 self.last_realtime_from
1683 } else {
1684 self.last_realtime_from = realtime_usec;
1685 self.last_realtime_to = realtime_usec;
1686 realtime_usec
1687 }
1688 }
1689 Direction::Forward => {
1690 if realtime_usec >= self.last_realtime_from
1691 && realtime_usec <= self.last_realtime_to
1692 {
1693 self.last_realtime_to = self.last_realtime_to.saturating_add(1);
1694 self.last_realtime_to
1695 } else {
1696 self.last_realtime_from = realtime_usec;
1697 self.last_realtime_to = realtime_usec;
1698 realtime_usec
1699 }
1700 }
1701 }
1702 }
1703}
1704
1705#[derive(Debug, Default)]
1706struct JournalSourceSummary {
1707 files: u64,
1708 total_size: u64,
1709 first_realtime_usec: Option<u64>,
1710 last_realtime_usec: Option<u64>,
1711}
1712
1713impl JournalSourceSummary {
1714 #[cfg(test)]
1715 fn from_paths(
1716 paths: &[PathBuf],
1717 reader_options: ReaderOptions,
1718 options: &NetdataFunctionRunOptions<'_>,
1719 ) -> Self {
1720 let mut summary = Self::default();
1721 for path in paths {
1722 let metadata = file_metadata(options, path);
1723 summary.add_path(path, reader_options, metadata.as_ref());
1724 }
1725 summary
1726 }
1727
1728 fn add_path(
1729 &mut self,
1730 path: &Path,
1731 reader_options: ReaderOptions,
1732 metadata: Option<&NetdataJournalFileMetadata>,
1733 ) {
1734 if let Ok(metadata) = std::fs::metadata(path) {
1735 self.files = self.files.saturating_add(1);
1736 self.total_size = self.total_size.saturating_add(metadata.len());
1737 }
1738 if let Some(metadata) = metadata {
1739 let metadata_first = metadata.msg_first_realtime_usec;
1740 let metadata_last = metadata.msg_last_realtime_usec;
1741 if let Some(first) = metadata_first {
1742 self.first_realtime_usec = Some(
1743 self.first_realtime_usec
1744 .map_or(first, |current| current.min(first)),
1745 );
1746 }
1747 if let Some(last) = metadata_last {
1748 self.last_realtime_usec = Some(
1749 self.last_realtime_usec
1750 .map_or(last, |current| current.max(last)),
1751 );
1752 }
1753 if metadata_first.is_some() && metadata_last.is_some() {
1754 return;
1755 }
1756 }
1757 let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
1758 return;
1759 };
1760 let header = reader.header();
1761 if header.head_entry_realtime != 0 {
1762 self.first_realtime_usec = Some(
1763 self.first_realtime_usec
1764 .map_or(header.head_entry_realtime, |current| {
1765 current.min(header.head_entry_realtime)
1766 }),
1767 );
1768 }
1769 if header.tail_entry_realtime != 0 {
1770 self.last_realtime_usec = Some(
1771 self.last_realtime_usec
1772 .map_or(header.tail_entry_realtime, |current| {
1773 current.max(header.tail_entry_realtime)
1774 }),
1775 );
1776 }
1777 }
1778
1779 fn info(&self) -> String {
1780 let coverage = match (self.first_realtime_usec, self.last_realtime_usec) {
1781 (Some(first), Some(last)) if last > first && (last - first) >= 1_000_000 => {
1782 human_duration_seconds((last - first) / 1_000_000)
1783 }
1784 _ => "off".to_string(),
1785 };
1786 let last_entry = self
1787 .last_realtime_usec
1788 .and_then(|usec| DateTime::<Utc>::from_timestamp((usec / 1_000_000) as i64, 0))
1789 .map(|datetime| datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1790 .unwrap_or_else(|| "unknown".to_string());
1791 format!(
1792 "{} files, total size {}, covering {}, last entry at {}",
1793 self.files,
1794 human_binary_size(self.total_size),
1795 coverage,
1796 last_entry
1797 )
1798 }
1799}
1800
1801fn push_source_option(target: &mut Vec<Value>, id: &str, summary: &JournalSourceSummary) {
1802 if summary.files == 0 {
1803 return;
1804 }
1805 target.push(json!({
1806 "id": id,
1807 "name": id,
1808 "info": summary.info(),
1809 "pill": human_binary_size(summary.total_size),
1810 }));
1811}
1812
1813fn expand_located_row_payloads(
1814 located: &mut LocatedRow,
1815 reader_options: ReaderOptions,
1816) -> Result<()> {
1817 let mut reader = FileReader::open_with_options(&located.file_path, reader_options)?;
1818 reader.seek_cursor(&located.row.cursor)?;
1819 if !reader.test_cursor(&located.row.cursor)? {
1820 return Err(SdkError::InvalidCursor(format!(
1821 "selected row cursor is no longer available: {}",
1822 located.row.cursor
1823 )));
1824 }
1825 reader.collect_entry_payloads(&mut located.row.payloads)
1826}
1827
1828#[derive(Debug, Default)]
1829struct CombinedResult {
1830 rows: Vec<LocatedRow>,
1831 facets: BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1832 histogram: Option<ExplorerHistogram>,
1833 column_fields: BTreeSet<String>,
1834 stats: ExplorerStats,
1835 page_counters: Option<NetdataPageCounters>,
1836 matched_files: u64,
1837 matched_paths: Vec<PathBuf>,
1838 skipped_files: u64,
1839 file_errors: Vec<String>,
1840 partial: bool,
1841 timed_out: bool,
1842 cancelled: bool,
1843 sampling_enabled: bool,
1844}
1845
1846#[derive(Clone, Copy, Debug, Default)]
1847struct NetdataPageCounters {
1848 matched: u64,
1849 before: u64,
1850 after: u64,
1851}
1852
1853#[derive(Clone, Copy)]
1854struct ProgressContext {
1855 current_file: usize,
1856 total_files: usize,
1857 started: Instant,
1858}
1859
1860#[derive(Debug)]
1861enum NetdataPageHeap {
1862 Backward(BinaryHeap<Reverse<u64>>),
1863 Forward(BinaryHeap<u64>),
1864}
1865
1866#[derive(Debug)]
1867struct NetdataPageWindow {
1868 direction: Direction,
1869 anchor_start_usec: Option<u64>,
1870 anchor_stop_usec: Option<u64>,
1871 limit: usize,
1872 heap: NetdataPageHeap,
1873 oldest_retained_usec: Option<u64>,
1874 newest_retained_usec: Option<u64>,
1875 matched: u64,
1876 skips_before: u64,
1877 skips_after: u64,
1878 shifts: u64,
1879}
1880
1881impl NetdataPageWindow {
1882 fn for_request(request: &NetdataRequest) -> Self {
1883 let (anchor_start_usec, anchor_stop_usec) = match (request.tail, request.anchor) {
1884 (true, ExplorerAnchor::Realtime(anchor)) => (None, Some(anchor)),
1885 (false, ExplorerAnchor::Realtime(anchor)) => (Some(anchor), None),
1886 _ => (None, None),
1887 };
1888 let heap = match request.direction {
1889 Direction::Backward => NetdataPageHeap::Backward(BinaryHeap::new()),
1890 Direction::Forward => NetdataPageHeap::Forward(BinaryHeap::new()),
1891 };
1892 Self {
1893 direction: request.direction,
1894 anchor_start_usec,
1895 anchor_stop_usec,
1896 limit: request.limit,
1897 heap,
1898 oldest_retained_usec: None,
1899 newest_retained_usec: None,
1900 matched: 0,
1901 skips_before: 0,
1902 skips_after: if request.tail
1903 && request.delta
1904 && matches!(request.anchor, ExplorerAnchor::Realtime(_))
1905 {
1906 1
1907 } else {
1908 0
1909 },
1910 shifts: 0,
1911 }
1912 }
1913
1914 fn candidate_to_keep(&self, realtime_usec: u64) -> bool {
1915 if self.limit == 0 || !self.entry_within_anchor_readonly(realtime_usec) {
1916 return false;
1917 }
1918 if self.retained_len() < self.limit {
1919 return true;
1920 }
1921 self.oldest_retained_usec
1922 .zip(self.newest_retained_usec)
1923 .is_some_and(|(oldest, newest)| realtime_usec >= oldest && realtime_usec <= newest)
1924 }
1925
1926 fn observe(&mut self, realtime_usec: u64) {
1927 if !self.entry_within_anchor(realtime_usec) || self.limit == 0 {
1928 return;
1929 }
1930 self.matched = self.matched.saturating_add(1);
1931 match (&mut self.heap, self.direction) {
1932 (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1933 if heap.len() < self.limit {
1934 heap.push(Reverse(realtime_usec));
1935 self.add_retained_bound(realtime_usec);
1936 return;
1937 }
1938 let Some(Reverse(oldest)) = heap.peek().copied() else {
1939 heap.push(Reverse(realtime_usec));
1940 self.refresh_retained_bounds();
1941 return;
1942 };
1943 if realtime_usec < oldest {
1944 self.skips_after = self.skips_after.saturating_add(1);
1945 return;
1946 }
1947 heap.pop();
1948 heap.push(Reverse(realtime_usec));
1949 self.refresh_retained_bounds();
1950 self.shifts = self.shifts.saturating_add(1);
1951 }
1952 (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1953 if heap.len() < self.limit {
1954 heap.push(realtime_usec);
1955 self.add_retained_bound(realtime_usec);
1956 return;
1957 }
1958 let Some(newest) = heap.peek().copied() else {
1959 heap.push(realtime_usec);
1960 self.refresh_retained_bounds();
1961 return;
1962 };
1963 if realtime_usec > newest {
1964 self.skips_before = self.skips_before.saturating_add(1);
1965 return;
1966 }
1967 heap.pop();
1968 heap.push(realtime_usec);
1969 self.refresh_retained_bounds();
1970 self.shifts = self.shifts.saturating_add(1);
1971 }
1972 _ => {}
1973 }
1974 }
1975
1976 fn retained_len(&self) -> usize {
1977 match &self.heap {
1978 NetdataPageHeap::Backward(heap) => heap.len(),
1979 NetdataPageHeap::Forward(heap) => heap.len(),
1980 }
1981 }
1982
1983 fn add_retained_bound(&mut self, realtime_usec: u64) {
1984 self.oldest_retained_usec = Some(
1985 self.oldest_retained_usec
1986 .map_or(realtime_usec, |current| current.min(realtime_usec)),
1987 );
1988 self.newest_retained_usec = Some(
1989 self.newest_retained_usec
1990 .map_or(realtime_usec, |current| current.max(realtime_usec)),
1991 );
1992 }
1993
1994 fn refresh_retained_bounds(&mut self) {
1995 let (oldest, newest) = match &self.heap {
1996 NetdataPageHeap::Backward(heap) => heap
1997 .iter()
1998 .map(|Reverse(usec)| *usec)
1999 .fold((None, None), retained_bounds_fold),
2000 NetdataPageHeap::Forward(heap) => heap
2001 .iter()
2002 .copied()
2003 .fold((None, None), retained_bounds_fold),
2004 };
2005 self.oldest_retained_usec = oldest;
2006 self.newest_retained_usec = newest;
2007 }
2008
2009 fn entry_within_anchor(&mut self, realtime_usec: u64) -> bool {
2010 match self.direction {
2011 Direction::Backward => {
2012 if self
2013 .anchor_start_usec
2014 .is_some_and(|anchor| realtime_usec >= anchor)
2015 {
2016 self.skips_before = self.skips_before.saturating_add(1);
2017 return false;
2018 }
2019 if self
2020 .anchor_stop_usec
2021 .is_some_and(|anchor| realtime_usec <= anchor)
2022 {
2023 self.skips_after = self.skips_after.saturating_add(1);
2024 return false;
2025 }
2026 }
2027 Direction::Forward => {
2028 if self
2029 .anchor_start_usec
2030 .is_some_and(|anchor| realtime_usec <= anchor)
2031 {
2032 self.skips_after = self.skips_after.saturating_add(1);
2033 return false;
2034 }
2035 if self
2036 .anchor_stop_usec
2037 .is_some_and(|anchor| realtime_usec >= anchor)
2038 {
2039 self.skips_before = self.skips_before.saturating_add(1);
2040 return false;
2041 }
2042 }
2043 }
2044 true
2045 }
2046
2047 fn entry_within_anchor_readonly(&self, realtime_usec: u64) -> bool {
2048 match self.direction {
2049 Direction::Backward => {
2050 if self
2051 .anchor_start_usec
2052 .is_some_and(|anchor| realtime_usec >= anchor)
2053 {
2054 return false;
2055 }
2056 if self
2057 .anchor_stop_usec
2058 .is_some_and(|anchor| realtime_usec <= anchor)
2059 {
2060 return false;
2061 }
2062 }
2063 Direction::Forward => {
2064 if self
2065 .anchor_start_usec
2066 .is_some_and(|anchor| realtime_usec <= anchor)
2067 {
2068 return false;
2069 }
2070 if self
2071 .anchor_stop_usec
2072 .is_some_and(|anchor| realtime_usec >= anchor)
2073 {
2074 return false;
2075 }
2076 }
2077 }
2078 true
2079 }
2080
2081 fn counters(&self) -> NetdataPageCounters {
2082 NetdataPageCounters {
2083 matched: self.matched,
2084 before: self.skips_before,
2085 after: self.skips_after.saturating_add(self.shifts),
2086 }
2087 }
2088
2089 fn can_stop_delta_file(&self, realtime_usec: u64, slack_usec: u64) -> bool {
2090 if self.limit == 0 {
2091 return false;
2092 }
2093 match (&self.heap, self.direction) {
2094 (NetdataPageHeap::Backward(heap), Direction::Backward) => {
2095 if heap.len() < self.limit {
2096 return false;
2097 }
2098 heap.peek().is_some_and(|Reverse(oldest)| {
2099 realtime_usec < oldest.saturating_sub(slack_usec)
2100 })
2101 }
2102 (NetdataPageHeap::Forward(heap), Direction::Forward) => {
2103 if heap.len() < self.limit {
2104 return false;
2105 }
2106 heap.peek()
2107 .is_some_and(|newest| realtime_usec > newest.saturating_add(slack_usec))
2108 }
2109 _ => false,
2110 }
2111 }
2112}
2113
2114fn retained_bounds_fold(
2115 (oldest, newest): (Option<u64>, Option<u64>),
2116 realtime_usec: u64,
2117) -> (Option<u64>, Option<u64>) {
2118 (
2119 Some(oldest.map_or(realtime_usec, |current| current.min(realtime_usec))),
2120 Some(newest.map_or(realtime_usec, |current| current.max(realtime_usec))),
2121 )
2122}
2123
2124impl CombinedResult {
2125 fn merge(
2126 &mut self,
2127 path: &Path,
2128 result: ExplorerResult,
2129 direction: Direction,
2130 limit: usize,
2131 ) -> Result<()> {
2132 let ExplorerResult {
2133 rows,
2134 facets,
2135 histogram,
2136 stats,
2137 column_fields,
2138 ..
2139 } = result;
2140
2141 if let Some(histogram) = histogram {
2142 merge_histogram(&mut self.histogram, histogram)?;
2143 }
2144
2145 self.merge_stats(stats);
2146 for row in rows {
2147 self.rows.push(LocatedRow {
2148 file_path: path.to_path_buf(),
2149 row,
2150 });
2151 }
2152 for field in column_fields {
2153 if let Ok(field) = String::from_utf8(field) {
2154 self.column_fields.insert(field);
2155 }
2156 }
2157 for (field, values) in facets {
2158 let target = self.facets.entry(field).or_default();
2159 for (value, count) in values {
2160 add_netdata_facet_count(target, &value, count);
2161 }
2162 }
2163 self.sort_and_limit(direction, limit);
2164 Ok(())
2165 }
2166
2167 fn add_column_fields<I>(&mut self, fields: I)
2168 where
2169 I: IntoIterator<Item = String>,
2170 {
2171 self.column_fields.extend(fields);
2172 }
2173
2174 fn sort_and_limit(&mut self, direction: Direction, limit: usize) {
2175 match direction {
2176 Direction::Forward => self.rows.sort_by_key(|row| row.row.realtime_usec),
2177 Direction::Backward => self
2178 .rows
2179 .sort_by(|left, right| right.row.realtime_usec.cmp(&left.row.realtime_usec)),
2180 }
2181 make_row_timestamps_unique(&mut self.rows, direction);
2182 if self.rows.len() > limit {
2183 self.rows.truncate(limit);
2184 }
2185 self.stats.rows_returned = self.rows.len() as u64;
2186 }
2187
2188 fn expand_row_payloads(&mut self, reader_options: ReaderOptions) {
2189 if self.rows.is_empty() {
2190 self.stats.rows_returned = 0;
2191 return;
2192 }
2193
2194 let mut rows = Vec::with_capacity(self.rows.len());
2195 for mut located in self.rows.drain(..) {
2196 if !located.row.payloads.is_empty() {
2197 rows.push(located);
2198 continue;
2199 }
2200 match expand_located_row_payloads(&mut located, reader_options) {
2201 Ok(()) => {
2202 self.stats.returned_row_expansions =
2203 self.stats.returned_row_expansions.saturating_add(1);
2204 rows.push(located);
2205 }
2206 Err(err) => {
2207 self.partial = true;
2208 self.file_errors.push(format!(
2209 "{} cursor {}: {err}",
2210 located.file_path.display(),
2211 located.row.cursor
2212 ));
2213 }
2214 }
2215 }
2216 self.rows = rows;
2217 self.stats.rows_returned = self.rows.len() as u64;
2218 }
2219
2220 fn merge_stats(&mut self, stats: ExplorerStats) {
2221 self.stats.rows_examined = self.stats.rows_examined.saturating_add(stats.rows_examined);
2222 self.stats.rows_matched = self.stats.rows_matched.saturating_add(stats.rows_matched);
2223 self.stats.facet_rows_matched = self
2224 .stats
2225 .facet_rows_matched
2226 .saturating_add(stats.facet_rows_matched);
2227 self.stats.rows_returned = self.stats.rows_returned.saturating_add(stats.rows_returned);
2228 self.stats.rows_unsampled = self
2229 .stats
2230 .rows_unsampled
2231 .saturating_add(stats.rows_unsampled);
2232 self.stats.rows_estimated = self
2233 .stats
2234 .rows_estimated
2235 .saturating_add(stats.rows_estimated);
2236 self.stats.sampling_sampled = self
2237 .stats
2238 .sampling_sampled
2239 .saturating_add(stats.sampling_sampled);
2240 self.stats.sampling_unsampled = self
2241 .stats
2242 .sampling_unsampled
2243 .saturating_add(stats.sampling_unsampled);
2244 self.stats.sampling_estimated = self
2245 .stats
2246 .sampling_estimated
2247 .saturating_add(stats.sampling_estimated);
2248 if stats.last_realtime_usec > self.stats.last_realtime_usec {
2249 self.stats.last_realtime_usec = stats.last_realtime_usec;
2250 }
2251 if stats.max_source_realtime_delta_usec > self.stats.max_source_realtime_delta_usec {
2252 self.stats.max_source_realtime_delta_usec = stats.max_source_realtime_delta_usec;
2253 }
2254 self.stats.data_refs_seen = self
2255 .stats
2256 .data_refs_seen
2257 .saturating_add(stats.data_refs_seen);
2258 self.stats.data_refs_skipped = self
2259 .stats
2260 .data_refs_skipped
2261 .saturating_add(stats.data_refs_skipped);
2262 self.stats.data_payloads_loaded = self
2263 .stats
2264 .data_payloads_loaded
2265 .saturating_add(stats.data_payloads_loaded);
2266 self.stats.data_objects_classified = self
2267 .stats
2268 .data_objects_classified
2269 .saturating_add(stats.data_objects_classified);
2270 self.stats.data_cache_hits = self
2271 .stats
2272 .data_cache_hits
2273 .saturating_add(stats.data_cache_hits);
2274 self.stats.data_cache_misses = self
2275 .stats
2276 .data_cache_misses
2277 .saturating_add(stats.data_cache_misses);
2278 self.stats.payloads_decompressed = self
2279 .stats
2280 .payloads_decompressed
2281 .saturating_add(stats.payloads_decompressed);
2282 self.stats.fts_scans = self.stats.fts_scans.saturating_add(stats.fts_scans);
2283 self.stats.facet_updates = self.stats.facet_updates.saturating_add(stats.facet_updates);
2284 self.stats.histogram_updates = self
2285 .stats
2286 .histogram_updates
2287 .saturating_add(stats.histogram_updates);
2288 self.stats.returned_row_expansions = self
2289 .stats
2290 .returned_row_expansions
2291 .saturating_add(stats.returned_row_expansions);
2292 self.stats.early_stop_opportunities = self
2293 .stats
2294 .early_stop_opportunities
2295 .saturating_add(stats.early_stop_opportunities);
2296 self.stats.early_stops = self.stats.early_stops.saturating_add(stats.early_stops);
2297 }
2298
2299 fn add_zero_count_facet_values(
2300 &mut self,
2301 vocabulary: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
2302 ) {
2303 for (field, values) in vocabulary {
2304 let target = self.facets.entry(field.clone()).or_default();
2305 for value in values.keys() {
2306 add_netdata_facet_count(target, value, 0);
2307 }
2308 }
2309 }
2310
2311 fn add_zero_count_facet_values_from_files(
2312 &mut self,
2313 fields: &[Vec<u8>],
2314 reader_options: ReaderOptions,
2315 ) {
2316 for path in &self.matched_paths {
2317 let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2318 continue;
2319 };
2320 for field in fields {
2321 let Ok(field_name) = std::str::from_utf8(field) else {
2322 continue;
2323 };
2324 let Ok(values) = reader.query_unique(field_name) else {
2325 continue;
2326 };
2327 if values.is_empty() {
2328 continue;
2329 }
2330 let target = self.facets.entry(field.clone()).or_default();
2331 for value in values {
2332 add_netdata_facet_count(target, &value, 0);
2333 }
2334 }
2335 }
2336 }
2337
2338 fn add_zero_count_selected_filter_values(&mut self, request: &NetdataRequest) {
2339 let mut report_fields: HashSet<Vec<u8>> = request.facets.iter().cloned().collect();
2340 if let Some(histogram) = &request.histogram {
2341 report_fields.insert(histogram.as_bytes().to_vec());
2342 }
2343 for filter in &request.filters {
2344 if !report_fields.contains(&filter.field) {
2345 continue;
2346 }
2347 let target = self.facets.entry(filter.field.clone()).or_default();
2348 for value in &filter.values {
2349 add_netdata_facet_count(target, value, 0);
2350 }
2351 }
2352 }
2353
2354 fn reportable_facet_fields(&self, requested: &[Vec<u8>]) -> Vec<String> {
2355 string_fields(&self.reportable_facet_fields_bytes(requested))
2356 }
2357
2358 fn reportable_facet_fields_bytes(&self, requested: &[Vec<u8>]) -> Vec<Vec<u8>> {
2359 let mut fields = Vec::new();
2360 for field in requested {
2361 if !fields.contains(field) {
2362 fields.push(field.clone());
2363 }
2364 }
2365 fields
2366 }
2367}
2368
2369fn netdata_function_error(status: u64, message: &str) -> Value {
2370 json!({
2371 "status": status,
2372 "errorMessage": message,
2373 })
2374}
2375
2376fn query_message(timed_out: bool, stats: &ExplorerStats) -> Value {
2377 if !timed_out && stats.rows_unsampled == 0 && stats.rows_estimated == 0 {
2378 return Value::String("OK".to_string());
2379 }
2380
2381 let total = stats
2382 .rows_examined
2383 .saturating_add(stats.rows_unsampled)
2384 .saturating_add(stats.rows_estimated)
2385 .max(1);
2386 let real_percent = stats.rows_examined as f64 * 100.0 / total as f64;
2387 let unsampled_percent = stats.rows_unsampled as f64 * 100.0 / total as f64;
2388 let estimated_percent = stats.rows_estimated as f64 * 100.0 / total as f64;
2389
2390 let mut title = String::new();
2391 let mut description = String::new();
2392 let mut status = "notice";
2393 if timed_out {
2394 title.push_str("Query timed-out, incomplete data. ");
2395 description.push_str(
2396 "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ",
2397 );
2398 status = "warning";
2399 }
2400 if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2401 title.push_str(&format!("{real_percent:.2}% real data"));
2402 description.push_str(&format!(
2403 "ACTUAL DATA: The filters counters reflect {real_percent:.2}% of the data. "
2404 ));
2405 }
2406 if stats.rows_unsampled != 0 {
2407 title.push_str(&format!(", {unsampled_percent:.2}% unsampled"));
2408 description.push_str(&format!(
2409 "UNSAMPLED DATA: {unsampled_percent:.2}% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
2410 ));
2411 }
2412 if stats.rows_estimated != 0 {
2413 title.push_str(&format!(", {estimated_percent:.2}% estimated"));
2414 description.push_str(&format!(
2415 "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by {estimated_percent:.2}%. "
2416 ));
2417 }
2418
2419 json!({
2420 "title": title,
2421 "status": status,
2422 "description": description,
2423 })
2424}
2425
2426fn merged_progress_stats(completed: &ExplorerStats, current: &ExplorerStats) -> ExplorerStats {
2427 let mut stats = completed.clone();
2428 stats.rows_examined = stats.rows_examined.saturating_add(current.rows_examined);
2429 stats.rows_matched = stats.rows_matched.saturating_add(current.rows_matched);
2430 stats.facet_rows_matched = stats
2431 .facet_rows_matched
2432 .saturating_add(current.facet_rows_matched);
2433 stats.rows_returned = stats.rows_returned.saturating_add(current.rows_returned);
2434 stats.rows_unsampled = stats.rows_unsampled.saturating_add(current.rows_unsampled);
2435 stats.rows_estimated = stats.rows_estimated.saturating_add(current.rows_estimated);
2436 stats.sampling_sampled = stats
2437 .sampling_sampled
2438 .saturating_add(current.sampling_sampled);
2439 stats.sampling_unsampled = stats
2440 .sampling_unsampled
2441 .saturating_add(current.sampling_unsampled);
2442 stats.sampling_estimated = stats
2443 .sampling_estimated
2444 .saturating_add(current.sampling_estimated);
2445 if current.last_realtime_usec > stats.last_realtime_usec {
2446 stats.last_realtime_usec = current.last_realtime_usec;
2447 }
2448 if current.max_source_realtime_delta_usec > stats.max_source_realtime_delta_usec {
2449 stats.max_source_realtime_delta_usec = current.max_source_realtime_delta_usec;
2450 }
2451 stats.data_refs_seen = stats.data_refs_seen.saturating_add(current.data_refs_seen);
2452 stats.data_refs_skipped = stats
2453 .data_refs_skipped
2454 .saturating_add(current.data_refs_skipped);
2455 stats.data_payloads_loaded = stats
2456 .data_payloads_loaded
2457 .saturating_add(current.data_payloads_loaded);
2458 stats.data_objects_classified = stats
2459 .data_objects_classified
2460 .saturating_add(current.data_objects_classified);
2461 stats.data_cache_hits = stats
2462 .data_cache_hits
2463 .saturating_add(current.data_cache_hits);
2464 stats.data_cache_misses = stats
2465 .data_cache_misses
2466 .saturating_add(current.data_cache_misses);
2467 stats.payloads_decompressed = stats
2468 .payloads_decompressed
2469 .saturating_add(current.payloads_decompressed);
2470 stats.fts_scans = stats.fts_scans.saturating_add(current.fts_scans);
2471 stats.facet_updates = stats.facet_updates.saturating_add(current.facet_updates);
2472 stats.histogram_updates = stats
2473 .histogram_updates
2474 .saturating_add(current.histogram_updates);
2475 stats.returned_row_expansions = stats
2476 .returned_row_expansions
2477 .saturating_add(current.returned_row_expansions);
2478 stats.early_stop_opportunities = stats
2479 .early_stop_opportunities
2480 .saturating_add(current.early_stop_opportunities);
2481 stats.early_stops = stats.early_stops.saturating_add(current.early_stops);
2482 stats
2483}
2484
2485struct Columns {
2486 order: Vec<String>,
2487 map: Map<String, Value>,
2488}
2489
2490struct QueryResponseArtifacts {
2491 reportable_facet_field_names: Vec<String>,
2492 columns: Columns,
2493 data: Vec<Value>,
2494 facets: Value,
2495 histogram: Option<Value>,
2496 message: Value,
2497 items: Value,
2498}
2499
2500fn base_query_response(
2501 request: &NetdataRequest,
2502 combined: &CombinedResult,
2503 artifacts: &QueryResponseArtifacts,
2504) -> Value {
2505 json!({
2506 "_request": &request.echo,
2507 "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
2508 "_journal_files": {
2509 "matched": combined.matched_files,
2510 "skipped": combined.skipped_files,
2511 "errors": &combined.file_errors,
2512 },
2513 "status": 200,
2514 "partial": combined.partial,
2515 "type": "table",
2516 "show_ids": true,
2517 "has_history": true,
2518 "pagination": {
2519 "enabled": true,
2520 "key": "anchor",
2521 "column": "timestamp",
2522 "units": "timestamp_usec",
2523 },
2524 "columns": &artifacts.columns.map,
2525 "data": &artifacts.data,
2526 "_stats": {
2527 "sdk_explorer": &combined.stats,
2528 },
2529 "expires": if request.data_only {
2530 unix_now_seconds().saturating_add(3600)
2531 } else {
2532 0
2533 }
2534 })
2535}
2536
2537fn response_items(request: &NetdataRequest, combined: &CombinedResult, returned: u64) -> Value {
2538 let unsampled = combined.stats.rows_unsampled;
2539 let estimated = combined.stats.rows_estimated;
2540 let fallback_rows_after_returned =
2541 response_fallback_rows_after_returned(&combined.stats, returned);
2542 let page_counters = combined
2543 .page_counters
2544 .unwrap_or_else(|| NetdataPageCounters {
2545 matched: combined.stats.rows_matched,
2546 before: 0,
2547 after: fallback_rows_after_returned,
2548 });
2549 json!({
2550 "evaluated": combined.stats.rows_examined.saturating_add(unsampled).saturating_add(estimated),
2551 "matched": page_counters.matched.saturating_add(unsampled).saturating_add(estimated),
2552 "unsampled": unsampled,
2553 "estimated": estimated,
2554 "returned": returned,
2555 "max_to_return": request.limit as u64,
2556 "before": page_counters.before,
2557 "after": page_counters.after,
2558 })
2559}
2560
2561fn response_fallback_rows_after_returned(stats: &ExplorerStats, returned: u64) -> u64 {
2562 let source_rows = if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2563 stats.rows_examined
2564 } else {
2565 stats.rows_matched
2566 };
2567 source_rows.saturating_sub(returned)
2568}
2569
2570fn add_last_modified_if_needed(
2571 object: &mut Map<String, Value>,
2572 request: &NetdataRequest,
2573 combined: &CombinedResult,
2574) {
2575 if !request.data_only || request.tail {
2576 object.insert(
2577 "last_modified".to_string(),
2578 Value::from(combined.stats.last_realtime_usec),
2579 );
2580 }
2581}
2582
2583fn add_sampling_if_needed(object: &mut Map<String, Value>, combined: &CombinedResult) {
2584 if combined.sampling_enabled {
2585 object.insert(
2586 "_sampling".to_string(),
2587 json!({
2588 "enabled": true,
2589 "sampled": combined.stats.sampling_sampled,
2590 "unsampled": combined.stats.sampling_unsampled,
2591 "estimated": combined.stats.sampling_estimated,
2592 }),
2593 );
2594 }
2595}
2596
2597fn add_analysis_outputs_if_needed(
2598 object: &mut Map<String, Value>,
2599 request: &NetdataRequest,
2600 artifacts: QueryResponseArtifacts,
2601) {
2602 if !request.data_only || request.delta {
2603 let (facets_key, histogram_key, items_key) = response_analysis_keys(request.data_only);
2604 object.insert(facets_key.to_string(), artifacts.facets);
2605 object.insert(
2606 histogram_key.to_string(),
2607 artifacts.histogram.unwrap_or(Value::Null),
2608 );
2609 object.insert(items_key.to_string(), artifacts.items);
2610 }
2611}
2612
2613fn response_analysis_keys(data_only: bool) -> (&'static str, &'static str, &'static str) {
2614 if data_only {
2615 ("facets_delta", "histogram_delta", "items_delta")
2616 } else {
2617 ("facets", "histogram", "items")
2618 }
2619}
2620
2621fn merge_histogram(
2622 target: &mut Option<ExplorerHistogram>,
2623 source: ExplorerHistogram,
2624) -> Result<()> {
2625 let Some(target) = target else {
2626 *target = Some(source);
2627 return Ok(());
2628 };
2629 if target.field != source.field
2630 || target.buckets.len() != source.buckets.len()
2631 || target
2632 .buckets
2633 .iter()
2634 .zip(source.buckets.iter())
2635 .any(|(target_bucket, source_bucket)| {
2636 target_bucket.start_realtime_usec != source_bucket.start_realtime_usec
2637 || target_bucket.end_realtime_usec != source_bucket.end_realtime_usec
2638 })
2639 {
2640 return Err(SdkError::Unsupported(
2641 "inconsistent Netdata histogram bucket shape",
2642 ));
2643 }
2644 for (index, source_bucket) in source.buckets.into_iter().enumerate() {
2645 let Some(target_bucket) = target.buckets.get_mut(index) else {
2646 return Err(SdkError::Unsupported(
2647 "inconsistent Netdata histogram bucket shape",
2648 ));
2649 };
2650 for (value, count) in source_bucket.values {
2651 *target_bucket.values.entry(value).or_default() += count;
2652 }
2653 }
2654 Ok(())
2655}
2656
2657fn facet_group_is_reportable(values: &BTreeMap<Vec<u8>, u64>) -> bool {
2658 values
2659 .iter()
2660 .any(|(value, _count)| !value.is_empty() && value.as_slice() != b"-")
2661}
2662
2663fn netdata_facet_value(value: &[u8]) -> &[u8] {
2664 if value.len() > NETDATA_FACET_MAX_VALUE_LENGTH {
2665 &value[..NETDATA_FACET_MAX_VALUE_LENGTH]
2666 } else {
2667 value
2668 }
2669}
2670
2671fn add_netdata_facet_count(target: &mut BTreeMap<Vec<u8>, u64>, value: &[u8], count: u64) {
2672 *target
2673 .entry(netdata_facet_value(value).to_vec())
2674 .or_default() += count;
2675}
2676
2677fn not_modified_before_scan_response(
2678 request: &NetdataRequest,
2679 selected: &SelectedJournalFiles,
2680) -> Option<Value> {
2681 if request.if_modified_since_usec != 0 && !selected.files_are_newer {
2682 Some(netdata_function_error(
2683 304,
2684 "No new data since the previous call.",
2685 ))
2686 } else {
2687 None
2688 }
2689}
2690
2691fn should_collect_unfiltered_facet_vocabulary(
2692 request: &NetdataRequest,
2693 combined: &CombinedResult,
2694) -> bool {
2695 !request.data_only && !combined.partial && !request.filters.is_empty()
2696}
2697
2698fn progress_context(file_index: usize, total_files: usize, started: Instant) -> ProgressContext {
2699 ProgressContext {
2700 current_file: file_index + 1,
2701 total_files,
2702 started,
2703 }
2704}
2705
2706fn emit_netdata_progress(
2707 options: &mut NetdataFunctionRunOptions<'_>,
2708 progress: NetdataFunctionProgress,
2709) {
2710 if let Some(callback) = options.progress_callback.as_deref_mut() {
2711 callback(progress);
2712 }
2713}
2714
2715fn emit_progress_for_combined(
2716 options: &mut NetdataFunctionRunOptions<'_>,
2717 combined: &CombinedResult,
2718 context: ProgressContext,
2719) {
2720 emit_netdata_progress(
2721 options,
2722 NetdataFunctionProgress {
2723 current_file: context.current_file,
2724 total_files: context.total_files,
2725 matched_files: combined.matched_files,
2726 skipped_files: combined.skipped_files,
2727 stats: combined.stats.clone(),
2728 elapsed: context.started.elapsed(),
2729 },
2730 );
2731}
2732
2733fn emit_explorer_progress(
2734 options: &mut NetdataFunctionRunOptions<'_>,
2735 combined: &CombinedResult,
2736 progress: ExplorerProgress,
2737 context: ProgressContext,
2738) {
2739 let stats = merged_progress_stats(&combined.stats, &progress.stats);
2740 if let Some(callback) = options.progress_callback.as_deref_mut() {
2741 callback(NetdataFunctionProgress {
2742 current_file: context.current_file,
2743 total_files: context.total_files,
2744 matched_files: combined.matched_files,
2745 skipped_files: combined.skipped_files,
2746 stats,
2747 elapsed: context.started.elapsed(),
2748 });
2749 }
2750}
2751
2752fn request_cancelled(options: &NetdataFunctionRunOptions<'_>) -> bool {
2753 options
2754 .cancellation_callback
2755 .is_some_and(|is_cancelled| is_cancelled())
2756}
2757
2758fn should_stop_before_file(
2759 combined: &mut CombinedResult,
2760 deadline: Option<Instant>,
2761 options: &NetdataFunctionRunOptions<'_>,
2762) -> bool {
2763 if request_cancelled(options) {
2764 combined.partial = true;
2765 combined.cancelled = true;
2766 return true;
2767 }
2768 if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
2769 combined.partial = true;
2770 combined.timed_out = true;
2771 return true;
2772 }
2773 false
2774}
2775
2776fn collect_column_fields_for_file(
2777 reader: &mut FileReader,
2778 request: &NetdataRequest,
2779 path: &Path,
2780 combined: &mut CombinedResult,
2781) {
2782 if request.data_only {
2783 return;
2784 }
2785 match reader.enumerate_fields_indexed() {
2786 Ok(fields) => combined.add_column_fields(fields),
2787 Err(err) => combined.file_errors.push(format!(
2788 "{}: FIELD index enumeration failed: {err}",
2789 path.display()
2790 )),
2791 }
2792}
2793
2794fn record_explore_result(
2795 result: Result<(ExplorerResult, Option<ExplorerStopReason>)>,
2796 path: &Path,
2797 combined: &mut CombinedResult,
2798) -> Option<(ExplorerResult, Option<ExplorerStopReason>)> {
2799 match result {
2800 Ok(result) => Some(result),
2801 Err(err) => {
2802 combined.skipped_files = combined.skipped_files.saturating_add(1);
2803 combined
2804 .file_errors
2805 .push(format!("{}: {err}", path.display()));
2806 None
2807 }
2808 }
2809}
2810
2811fn delta_scan_can_stop(
2812 request: &NetdataRequest,
2813 page_window: &RefCell<NetdataPageWindow>,
2814 realtime_usec: u64,
2815 rows_matched: u64,
2816 realtime_delta_usec: u64,
2817) -> bool {
2818 let mut page_window = page_window.borrow_mut();
2819 page_window.observe(realtime_usec);
2820 request.data_only
2821 && request.delta
2822 && rows_matched % DATA_ONLY_CHECK_EVERY_ROWS == 0
2823 && page_window.can_stop_delta_file(realtime_usec, realtime_delta_usec)
2824}
2825
2826#[allow(clippy::too_many_arguments)]
2827fn finish_explored_file(
2828 options: &mut NetdataFunctionRunOptions<'_>,
2829 request: &NetdataRequest,
2830 file: &SelectedJournalFile,
2831 query: &ExplorerQuery,
2832 result: ExplorerResult,
2833 stop_reason: Option<ExplorerStopReason>,
2834 combined: &mut CombinedResult,
2835 files: &[SelectedJournalFile],
2836 file_index: usize,
2837 progress: ProgressContext,
2838) -> Result<bool> {
2839 update_learned_realtime_delta(options, &file.path, &file.order, &result.stats);
2840 combined.merge(&file.path, result, query.direction, request.limit)?;
2841 emit_progress_for_combined(options, combined, progress);
2842 if request_cancelled(options) {
2843 combined.partial = true;
2844 combined.cancelled = true;
2845 return Ok(true);
2846 }
2847 if let Some(reason) = stop_reason {
2848 combined.partial = true;
2849 match reason {
2850 ExplorerStopReason::TimedOut => combined.timed_out = true,
2851 ExplorerStopReason::Cancelled => combined.cancelled = true,
2852 }
2853 return Ok(true);
2854 }
2855 Ok(request.data_only
2856 && !request.delta
2857 && !request.tail
2858 && combined.rows.len() >= request.limit
2859 && remaining_files_cannot_affect_data_page(combined, request, files, file_index + 1))
2860}
2861
2862fn file_metadata(
2863 options: &NetdataFunctionRunOptions<'_>,
2864 path: &Path,
2865) -> Option<NetdataJournalFileMetadata> {
2866 options
2867 .state
2868 .as_deref()
2869 .and_then(|state| state.file_metadata(path))
2870}
2871
2872fn update_learned_realtime_delta(
2873 options: &mut NetdataFunctionRunOptions<'_>,
2874 path: &Path,
2875 order: &JournalFileOrderInfo,
2876 stats: &ExplorerStats,
2877) {
2878 let learned_delta = stats.max_source_realtime_delta_usec;
2879 if learned_delta == 0 || learned_delta <= order.journal_vs_realtime_delta_usec {
2880 return;
2881 }
2882 let learned_delta = normalize_journal_vs_realtime_delta_usec(learned_delta);
2883 if learned_delta <= order.journal_vs_realtime_delta_usec {
2884 return;
2885 }
2886 if let Some(state) = options.state.as_deref_mut() {
2887 state.update_file_journal_vs_realtime_delta_usec(path, learned_delta);
2888 }
2889}
2890
2891fn normalize_journal_vs_realtime_delta_usec(delta_usec: u64) -> u64 {
2892 delta_usec
2893 .max(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC)
2894 .min(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC)
2895}
2896
2897#[cfg(test)]
2898fn file_may_overlap_request(header: crate::FileHeader, request: &NetdataRequest) -> bool {
2899 if header.tail_entry_realtime == 0 {
2900 return true;
2901 }
2902
2903 let first = header
2904 .head_entry_realtime
2905 .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2906 let last = header
2907 .tail_entry_realtime
2908 .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2909
2910 if request
2911 .after_realtime_usec
2912 .is_some_and(|after| last < after)
2913 {
2914 return false;
2915 }
2916 if request
2917 .before_realtime_usec
2918 .is_some_and(|before| first > before)
2919 {
2920 return false;
2921 }
2922
2923 true
2924}
2925
2926#[derive(Debug)]
2927struct SelectedJournalFile {
2928 path: PathBuf,
2929 order: JournalFileOrderInfo,
2930}
2931
2932#[derive(Debug, Default)]
2933struct SelectedJournalFiles {
2934 files: Vec<SelectedJournalFile>,
2935 files_are_newer: bool,
2936}
2937
2938fn select_journal_files_for_request(
2939 paths: Vec<PathBuf>,
2940 request: &NetdataRequest,
2941 reader_options: ReaderOptions,
2942 options: &NetdataFunctionRunOptions<'_>,
2943) -> SelectedJournalFiles {
2944 let mut selected = Vec::new();
2945 for path in paths {
2946 let metadata = file_metadata(options, &path);
2947 if !request.matches_source(&path, metadata.as_ref()) {
2948 continue;
2949 }
2950 let order = journal_file_order_info(&path, reader_options, metadata.as_ref());
2951 if !journal_file_order_may_overlap_request(&order, request) {
2952 continue;
2953 }
2954 selected.push(SelectedJournalFile { path, order });
2955 }
2956 selected.sort_by(|left, right| {
2957 compare_journal_file_order(&left.order, &right.order, request.direction)
2958 .then_with(|| left.path.cmp(&right.path))
2959 });
2960 let files_are_newer = selected
2961 .iter()
2962 .any(|file| file.order.msg_last_realtime_usec > request.if_modified_since_usec);
2963 SelectedJournalFiles {
2964 files: selected,
2965 files_are_newer,
2966 }
2967}
2968
2969fn remaining_files_cannot_affect_data_page(
2970 combined: &CombinedResult,
2971 request: &NetdataRequest,
2972 files: &[SelectedJournalFile],
2973 next_file_index: usize,
2974) -> bool {
2975 let Some(next) = files.get(next_file_index) else {
2976 return true;
2977 };
2978 let slack = next.order.journal_vs_realtime_delta_usec;
2979 match request.direction {
2980 Direction::Backward => {
2981 let Some(oldest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).min()
2982 else {
2983 return false;
2984 };
2985 next.order.msg_last_realtime_usec < oldest_retained.saturating_sub(slack)
2986 }
2987 Direction::Forward => {
2988 let Some(newest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).max()
2989 else {
2990 return false;
2991 };
2992 next.order.msg_first_realtime_usec > newest_retained.saturating_add(slack)
2993 }
2994 }
2995}
2996
2997fn journal_file_order_may_overlap_request(
2998 info: &JournalFileOrderInfo,
2999 request: &NetdataRequest,
3000) -> bool {
3001 if info.msg_last_realtime_usec == 0 {
3002 return true;
3003 }
3004
3005 let first = info
3006 .msg_first_realtime_usec
3007 .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
3008 let last = info
3009 .msg_last_realtime_usec
3010 .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
3011
3012 if request
3013 .after_realtime_usec
3014 .is_some_and(|after| last < after)
3015 {
3016 return false;
3017 }
3018 if request
3019 .before_realtime_usec
3020 .is_some_and(|before| first > before)
3021 {
3022 return false;
3023 }
3024
3025 true
3026}
3027
3028fn collect_boot_first_realtime(
3029 paths: &[PathBuf],
3030 reader_options: ReaderOptions,
3031 needed_boot_ids: &BTreeSet<Vec<u8>>,
3032) -> BTreeMap<Vec<u8>, u64> {
3033 let mut out = BTreeMap::new();
3034 if needed_boot_ids.is_empty() {
3035 return out;
3036 }
3037 for path in paths {
3038 let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
3039 continue;
3040 };
3041 let Ok(boot_ids) = reader.query_unique("_BOOT_ID") else {
3042 continue;
3043 };
3044 for boot_id in boot_ids {
3045 if !needed_boot_ids.contains(&boot_id) {
3046 continue;
3047 }
3048 let mut match_payload = b"_BOOT_ID=".to_vec();
3049 match_payload.extend_from_slice(&boot_id);
3050 reader.flush_matches();
3051 reader.add_match(&match_payload);
3052 reader.seek_head();
3053 if !reader.next().unwrap_or(false) {
3054 continue;
3055 }
3056 if let Ok(realtime) = reader.get_realtime_usec() {
3057 record_boot_first_realtime(&mut out, boot_id, realtime);
3058 }
3059 }
3060 reader.flush_matches();
3061 }
3062 out
3063}
3064
3065fn response_boot_ids(
3066 column_order: &[String],
3067 rows: &[LocatedRow],
3068 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
3069 histogram: Option<&ExplorerHistogram>,
3070) -> BTreeSet<Vec<u8>> {
3071 let mut boot_ids = BTreeSet::new();
3072 let row_needs_boot_id = column_order.iter().any(|field| field == "_BOOT_ID");
3073 if row_needs_boot_id {
3074 for row in rows {
3075 if let Some(values) = row_fields(row).get("_BOOT_ID") {
3076 boot_ids.extend(values.iter().cloned());
3077 }
3078 }
3079 }
3080 if let Some(values) = facets.get(b"_BOOT_ID".as_slice()) {
3081 boot_ids.extend(
3082 values
3083 .keys()
3084 .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3085 .cloned(),
3086 );
3087 }
3088 if let Some(histogram) = histogram.filter(|histogram| histogram.field == b"_BOOT_ID") {
3089 for bucket in &histogram.buckets {
3090 boot_ids.extend(
3091 bucket
3092 .values
3093 .keys()
3094 .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3095 .cloned(),
3096 );
3097 }
3098 }
3099 boot_ids
3100}
3101
3102fn record_boot_first_realtime(
3103 target: &mut BTreeMap<Vec<u8>, u64>,
3104 boot_id: Vec<u8>,
3105 realtime_usec: u64,
3106) {
3107 let existing = target.entry(boot_id).or_insert(realtime_usec);
3108 if realtime_usec < *existing {
3109 *existing = realtime_usec;
3110 }
3111}
3112
3113fn row_fields(row: &LocatedRow) -> BTreeMap<String, Vec<Vec<u8>>> {
3114 let mut fields = BTreeMap::new();
3115 for payload in &row.row.payloads {
3116 let Some((field, value)) = split_payload(payload) else {
3117 continue;
3118 };
3119 fields
3120 .entry(String::from_utf8_lossy(field).into_owned())
3121 .or_insert_with(Vec::new)
3122 .push(value.to_vec());
3123 }
3124 fields.insert(
3125 "ND_JOURNAL_FILE".to_string(),
3126 vec![row.file_path.display().to_string().into_bytes()],
3127 );
3128 if !fields.contains_key("ND_JOURNAL_PROCESS") {
3129 let process = dynamic_process_name(&fields);
3130 if !process.is_empty() {
3131 fields.insert("ND_JOURNAL_PROCESS".to_string(), vec![process.into_bytes()]);
3132 }
3133 }
3134 fields
3135}
3136
3137fn dynamic_process_name(fields: &BTreeMap<String, Vec<Vec<u8>>>) -> String {
3138 let base = first_value(fields, "CONTAINER_NAME")
3139 .or_else(|| first_value(fields, "SYSLOG_IDENTIFIER"))
3140 .or_else(|| first_value(fields, "_COMM"))
3141 .map(|value| String::from_utf8_lossy(value).into_owned())
3142 .unwrap_or_default();
3143 if base.is_empty() {
3144 return "-".to_string();
3145 }
3146 match first_value(fields, "_PID") {
3147 Some(pid) if !pid.is_empty() => {
3148 let pid = String::from_utf8_lossy(pid);
3149 format!("{base}[{pid}]")
3150 }
3151 Some(_) => base,
3152 None => format!("{base}[-]"),
3153 }
3154}
3155
3156fn first_value<'a>(fields: &'a BTreeMap<String, Vec<Vec<u8>>>, field: &str) -> Option<&'a [u8]> {
3157 fields
3158 .get(field)
3159 .and_then(|values| values.first())
3160 .map(Vec::as_slice)
3161}
3162
3163fn split_payload(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3164 let split = payload.iter().position(|byte| *byte == b'=')?;
3165 Some((&payload[..split], &payload[split + 1..]))
3166}
3167
3168fn make_row_timestamps_unique(rows: &mut [LocatedRow], direction: Direction) {
3169 let mut last_from = 0u64;
3170 let mut last_to = 0u64;
3171 let mut initialized = false;
3172 for row in rows {
3173 let timestamp = row.row.realtime_usec;
3174 if initialized && timestamp >= last_from && timestamp <= last_to {
3175 match direction {
3176 Direction::Backward => {
3177 last_from = last_from.saturating_sub(1);
3178 row.row.realtime_usec = last_from;
3179 }
3180 Direction::Forward => {
3181 last_to = last_to.saturating_add(1);
3182 row.row.realtime_usec = last_to;
3183 }
3184 }
3185 } else {
3186 last_from = timestamp;
3187 last_to = timestamp;
3188 initialized = true;
3189 }
3190 }
3191}
3192
3193fn column_metadata(key: &str, index: usize) -> Value {
3194 let (visible, filter, full_width) = match key {
3195 "timestamp" => (true, "range", false),
3196 "rowOptions" => (false, "none", false),
3197 "_HOSTNAME" => (true, "facet", false),
3198 "ND_JOURNAL_PROCESS" | "MESSAGE" => (true, "none", key == "MESSAGE"),
3199 "ND_JOURNAL_FILE" | "_SOURCE_REALTIME_TIMESTAMP" => (false, "none", false),
3200 _ if systemd_column_is_facet(key) => (false, "facet", false),
3201 _ => (false, "none", false),
3202 };
3203 let column_type = if key == "timestamp" {
3204 "timestamp"
3205 } else if key == "rowOptions" {
3206 "none"
3207 } else {
3208 "string"
3209 };
3210 let visualization = if key == "rowOptions" {
3211 "rowOptions"
3212 } else {
3213 "value"
3214 };
3215 let mut metadata = json!({
3216 "index": index,
3217 "unique_key": key == "timestamp",
3218 "name": if key == "timestamp" { "Timestamp" } else { key },
3219 "visible": visible,
3220 "type": column_type,
3221 "visualization": visualization,
3222 "value_options": {
3223 "transform": if key == "timestamp" { "datetime_usec" } else { "none" },
3224 "decimal_points": 0,
3225 "default_value": if key == "timestamp" || key == "rowOptions" {
3226 Value::Null
3227 } else {
3228 Value::String("-".to_string())
3229 },
3230 },
3231 "sort": "ascending",
3232 "sortable": false,
3233 "sticky": false,
3234 "summary": "count",
3235 "filter": filter,
3236 "full_width": full_width,
3237 "wrap": key != "rowOptions",
3238 "default_expanded_filter": matches!(key, "PRIORITY" | "SYSLOG_FACILITY" | "MESSAGE_ID"),
3239 });
3240 if key == "rowOptions" {
3241 if let Some(object) = metadata.as_object_mut() {
3242 object.insert("dummy".to_string(), Value::Bool(true));
3243 }
3244 }
3245 metadata
3246}
3247
3248fn systemd_column_is_facet(key: &str) -> bool {
3249 if key == "MESSAGE_ID" {
3250 return true;
3251 }
3252 if key.contains("MESSAGE") || key.contains("TIMESTAMP") || key.starts_with("__") {
3253 return false;
3254 }
3255 true
3256}
3257
3258fn sort_facet_options(field: &str, options: &mut [Value]) {
3259 options.sort_by(|left, right| {
3260 let left_id = left.get("id").and_then(Value::as_str).unwrap_or_default();
3261 let right_id = right.get("id").and_then(Value::as_str).unwrap_or_default();
3262 if field == "PRIORITY" {
3263 return parse_priority(left_id).cmp(&parse_priority(right_id));
3264 }
3265 let left_count = left
3266 .get("count")
3267 .and_then(Value::as_u64)
3268 .unwrap_or_default();
3269 let right_count = right
3270 .get("count")
3271 .and_then(Value::as_u64)
3272 .unwrap_or_default();
3273 right_count
3274 .cmp(&left_count)
3275 .then_with(|| left_id.cmp(right_id))
3276 });
3277}
3278
3279fn parse_fts_query_patterns(query: &str) -> (Vec<ExplorerFtsPattern>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
3280 let bytes = query.as_bytes();
3281 let mut index = 0usize;
3282 let mut terms = Vec::new();
3283 let mut positives = Vec::new();
3284 let mut negatives = Vec::new();
3285
3286 while let Some((pattern, negative)) = next_fts_pattern(bytes, &mut index) {
3287 push_fts_pattern(
3288 pattern,
3289 negative,
3290 &mut terms,
3291 &mut positives,
3292 &mut negatives,
3293 );
3294 }
3295
3296 (terms, positives, negatives)
3297}
3298
3299fn next_fts_pattern(bytes: &[u8], index: &mut usize) -> Option<(Vec<u8>, bool)> {
3300 while *index < bytes.len() {
3301 skip_fts_separators(bytes, index);
3302 let negative = consume_fts_negative_marker(bytes, index);
3303 let pattern = read_fts_pattern(bytes, index);
3304 if !pattern.is_empty() {
3305 return Some((pattern, negative));
3306 }
3307 }
3308 None
3309}
3310
3311fn skip_fts_separators(bytes: &[u8], index: &mut usize) {
3312 while *index < bytes.len() && bytes[*index] == b'|' {
3313 *index += 1;
3314 }
3315}
3316
3317fn consume_fts_negative_marker(bytes: &[u8], index: &mut usize) -> bool {
3318 if bytes.get(*index) == Some(&b'!') {
3319 *index += 1;
3320 true
3321 } else {
3322 false
3323 }
3324}
3325
3326fn read_fts_pattern(bytes: &[u8], index: &mut usize) -> Vec<u8> {
3327 let mut pattern = Vec::new();
3328 let mut escaped = false;
3329 while *index < bytes.len() {
3330 let byte = bytes[*index];
3331 *index += 1;
3332 if byte == b'\\' && !escaped {
3333 escaped = true;
3334 continue;
3335 }
3336 if byte == b'|' && !escaped {
3337 break;
3338 }
3339 pattern.push(byte);
3340 escaped = false;
3341 }
3342 pattern
3343}
3344
3345fn push_fts_pattern(
3346 pattern: Vec<u8>,
3347 negative: bool,
3348 terms: &mut Vec<ExplorerFtsPattern>,
3349 positives: &mut Vec<Vec<u8>>,
3350 negatives: &mut Vec<Vec<u8>>,
3351) {
3352 terms.push(ExplorerFtsPattern::substring(pattern.clone(), negative));
3353 if negative {
3354 negatives.push(pattern);
3355 } else {
3356 positives.push(pattern);
3357 }
3358}
3359
3360fn parse_filters(value: Option<&Value>) -> Vec<ExplorerFilter> {
3361 let Some(Value::Object(selections)) = value else {
3362 return Vec::new();
3363 };
3364 let mut filters = Vec::new();
3365 for (field, values) in selections {
3366 if matches!(field.as_str(), "query" | "source" | "__logs_sources") {
3367 continue;
3368 }
3369 let Some(values) = parse_string_array(Some(values)) else {
3370 continue;
3371 };
3372 filters.push(ExplorerFilter::new(
3373 field.as_bytes().to_vec(),
3374 values
3375 .into_iter()
3376 .map(|value| normalize_filter_value(field, &value)),
3377 ));
3378 }
3379 filters
3380}
3381
3382#[derive(Debug, Clone)]
3383struct SourceSelection {
3384 source_type: u64,
3385 exact_sources: Vec<String>,
3386}
3387
3388fn parse_source_selection(value: Option<&Value>) -> SourceSelection {
3389 let mut selection = SourceSelection {
3390 source_type: SOURCE_TYPE_ALL,
3391 exact_sources: Vec::new(),
3392 };
3393 let Some(Value::Object(selections)) = value else {
3394 return selection;
3395 };
3396 let Some(values) = parse_string_array(selections.get("__logs_sources")) else {
3397 return selection;
3398 };
3399 selection.source_type = 0;
3400 for value in values {
3401 match source_type_for_name(&value) {
3402 Some(source_type) => selection.source_type |= source_type,
3403 None => selection.exact_sources.push(value),
3404 }
3405 }
3406 selection
3407}
3408
3409fn source_type_for_name(value: &str) -> Option<u64> {
3410 match value {
3411 "all" => Some(SOURCE_TYPE_ALL),
3412 "all-local-logs" => Some(SOURCE_TYPE_LOCAL_ALL),
3413 "all-remote-systems" => Some(SOURCE_TYPE_REMOTE_ALL),
3414 "all-local-system-logs" => Some(SOURCE_TYPE_LOCAL_SYSTEM),
3415 "all-local-user-logs" => Some(SOURCE_TYPE_LOCAL_USER),
3416 "all-local-namespaces" => Some(SOURCE_TYPE_LOCAL_NAMESPACE),
3417 "all-uncategorized" => Some(SOURCE_TYPE_LOCAL_OTHER),
3418 _ => None,
3419 }
3420}
3421
3422fn journal_file_source_type(path: &Path) -> u64 {
3423 let text = path.to_string_lossy();
3424 let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
3425 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER;
3426 };
3427 if text.contains("/remote/") {
3428 return SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL;
3429 }
3430 if local_namespace_source_name(path).is_some() {
3431 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE;
3432 }
3433 if name.starts_with("system") {
3434 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM;
3435 }
3436 if name.starts_with("user") {
3437 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER;
3438 }
3439 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
3440}
3441
3442fn local_namespace_source_name(path: &Path) -> Option<String> {
3443 let parent = path.parent()?.file_name()?.to_str()?;
3444 let (_, namespace) = parent.rsplit_once('.')?;
3445 (!namespace.is_empty()).then(|| format!("namespace-{namespace}"))
3446}
3447
3448fn journal_file_exact_source_name(path: &Path) -> Option<String> {
3449 let text = path.to_string_lossy();
3450 if text.contains("/remote/") {
3451 let name = path.file_name()?.to_str()?;
3452 let source = name
3453 .split_once('@')
3454 .map(|(prefix, _)| prefix)
3455 .unwrap_or_else(|| {
3456 name.strip_suffix(".journal~.zst")
3457 .or_else(|| name.strip_suffix(".journal.zst"))
3458 .or_else(|| name.strip_suffix(".journal~"))
3459 .or_else(|| name.strip_suffix(".journal"))
3460 .unwrap_or(name)
3461 });
3462 return source.starts_with("remote-").then(|| source.to_string());
3463 }
3464 local_namespace_source_name(path)
3465}
3466
3467fn normalize_filter_value(field: &str, value: &str) -> Vec<u8> {
3468 if field == "PRIORITY" {
3469 if let Some(priority) = priority_name_to_number(value) {
3470 return priority.as_bytes().to_vec();
3471 }
3472 }
3473 value.as_bytes().to_vec()
3474}
3475
3476fn parse_string_array(value: Option<&Value>) -> Option<Vec<String>> {
3477 let Value::Array(items) = value? else {
3478 return None;
3479 };
3480 Some(
3481 items
3482 .iter()
3483 .filter_map(Value::as_str)
3484 .map(ToOwned::to_owned)
3485 .collect(),
3486 )
3487}
3488
3489fn request_direction(object: &Map<String, Value>) -> Direction {
3490 match get_str(object, "direction").unwrap_or("backward") {
3491 "forward" | "forwards" | "next" => Direction::Forward,
3492 _ => Direction::Backward,
3493 }
3494}
3495
3496fn request_delta(data_only: bool, object: &Map<String, Value>) -> bool {
3497 data_only && get_bool(object, "delta").unwrap_or(false)
3498}
3499
3500fn request_tail(data_only: bool, if_modified_since_usec: u64, object: &Map<String, Value>) -> bool {
3501 data_only && if_modified_since_usec != 0 && get_bool(object, "tail").unwrap_or(false)
3502}
3503
3504fn tail_after_realtime_bound(
3505 after_realtime_usec: Option<u64>,
3506 anchor: ExplorerAnchor,
3507) -> Option<u64> {
3508 let ExplorerAnchor::Realtime(anchor) = anchor else {
3509 return after_realtime_usec;
3510 };
3511 let tail_after = anchor.saturating_add(1);
3512 Some(
3513 after_realtime_usec
3514 .map(|after| after.max(tail_after))
3515 .unwrap_or(tail_after),
3516 )
3517}
3518
3519fn before_realtime_bound_excluding_anchor(
3520 before_realtime_usec: Option<u64>,
3521 anchor: ExplorerAnchor,
3522) -> Option<u64> {
3523 let ExplorerAnchor::Realtime(anchor) = anchor else {
3524 return before_realtime_usec;
3525 };
3526 let before_anchor = anchor.saturating_sub(1);
3527 Some(
3528 before_realtime_usec
3529 .map(|before| before.min(before_anchor))
3530 .unwrap_or(before_anchor),
3531 )
3532}
3533
3534fn request_anchor_and_direction(
3535 object: &Map<String, Value>,
3536 tail: bool,
3537 direction: Direction,
3538 after_realtime_usec: Option<u64>,
3539 before_realtime_usec: Option<u64>,
3540) -> (ExplorerAnchor, Direction) {
3541 let anchor = get_u64(object, "anchor")
3542 .filter(|value| *value != 0)
3543 .map(normalize_timestamp_to_usec)
3544 .map(ExplorerAnchor::Realtime)
3545 .unwrap_or(ExplorerAnchor::Auto);
3546 if tail && matches!(anchor, ExplorerAnchor::Realtime(_)) {
3547 return (anchor, Direction::Backward);
3548 }
3549 if anchor_outside_window(anchor, after_realtime_usec, before_realtime_usec) {
3550 (ExplorerAnchor::Auto, Direction::Backward)
3551 } else {
3552 (anchor, direction)
3553 }
3554}
3555
3556fn anchor_outside_window(
3557 anchor: ExplorerAnchor,
3558 after_realtime_usec: Option<u64>,
3559 before_realtime_usec: Option<u64>,
3560) -> bool {
3561 let ExplorerAnchor::Realtime(anchor_usec) = anchor else {
3562 return false;
3563 };
3564 after_realtime_usec.is_some_and(|after| anchor_usec < after)
3565 || before_realtime_usec.is_some_and(|before| anchor_usec > before)
3566}
3567
3568fn request_limit(object: &Map<String, Value>) -> usize {
3569 get_u64(object, "last")
3570 .filter(|value| *value != 0)
3571 .map(|value| value as usize)
3572 .unwrap_or(DEFAULT_ITEMS_TO_RETURN)
3573}
3574
3575fn request_facets(
3576 requested_facets: &Option<Vec<String>>,
3577 config: &NetdataFunctionConfig,
3578) -> Vec<Vec<u8>> {
3579 requested_facets
3580 .clone()
3581 .unwrap_or_else(|| config.default_facets.clone())
3582 .into_iter()
3583 .map(Vec::from)
3584 .collect()
3585}
3586
3587fn request_histogram(object: &Map<String, Value>) -> Option<String> {
3588 get_str(object, "histogram")
3589 .filter(|histogram| !histogram.is_empty())
3590 .map(ToOwned::to_owned)
3591}
3592
3593fn request_histogram_or_default(
3594 requested_histogram: &Option<String>,
3595 config: &NetdataFunctionConfig,
3596) -> Option<String> {
3597 requested_histogram
3598 .clone()
3599 .or_else(|| config.default_histogram.clone())
3600}
3601
3602fn request_query(object: &Map<String, Value>) -> Option<String> {
3603 get_str(object, "query")
3604 .filter(|query| !query.is_empty())
3605 .map(ToOwned::to_owned)
3606}
3607
3608fn get_bool(object: &Map<String, Value>, key: &str) -> Option<bool> {
3609 object.get(key).and_then(Value::as_bool)
3610}
3611
3612fn get_i64(object: &Map<String, Value>, key: &str) -> Option<i64> {
3613 object.get(key).and_then(Value::as_i64)
3614}
3615
3616fn get_u64(object: &Map<String, Value>, key: &str) -> Option<u64> {
3617 object.get(key).and_then(Value::as_u64)
3618}
3619
3620fn get_str<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
3621 object.get(key).and_then(Value::as_str)
3622}
3623
3624fn normalize_time_window(
3625 now_seconds: i64,
3626 after: Option<i64>,
3627 before: Option<i64>,
3628) -> (Option<u64>, Option<u64>) {
3629 let mut after = after.unwrap_or(0);
3630 let mut before = before.unwrap_or(0);
3631
3632 if after == 0 && before == 0 {
3633 before = now_seconds;
3634 after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3635 } else {
3636 (after, before) = relative_window_to_absolute(now_seconds, after, before);
3637 }
3638
3639 if after > before {
3640 std::mem::swap(&mut after, &mut before);
3641 }
3642 if after == before {
3643 after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3644 }
3645
3646 (
3647 Some(normalize_timestamp_to_usec_with_rounding(
3648 after.max(0) as u64,
3649 false,
3650 )),
3651 Some(normalize_timestamp_to_usec_with_rounding(
3652 before.max(0) as u64,
3653 true,
3654 )),
3655 )
3656}
3657
3658fn relative_window_to_absolute(now_seconds: i64, after: i64, before: i64) -> (i64, i64) {
3659 let mut after = after;
3660 let mut before = before;
3661
3662 if before.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3663 if before > 0 {
3664 before = -before;
3665 }
3666 before = now_seconds.saturating_add(before);
3667 }
3668
3669 if after.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3670 if after > 0 {
3671 after = -after;
3672 }
3673 if after == 0 {
3674 after = -NETDATA_MISSING_AFTER_RELATIVE_SECONDS;
3675 }
3676 after = before.saturating_add(after).saturating_add(1);
3677 }
3678
3679 if after > before {
3680 std::mem::swap(&mut after, &mut before);
3681 }
3682
3683 if before > now_seconds {
3684 let delta = before.saturating_sub(now_seconds);
3685 before = before.saturating_sub(delta);
3686 after = after.saturating_sub(delta);
3687 }
3688
3689 (after, before)
3690}
3691
3692struct RequestEchoInput<'a> {
3693 info: bool,
3694 after_realtime_usec: Option<u64>,
3695 before_realtime_usec: Option<u64>,
3696 if_modified_since_usec: u64,
3697 anchor: ExplorerAnchor,
3698 direction: Direction,
3699 limit: usize,
3700 data_only: bool,
3701 delta: bool,
3702 tail: bool,
3703 sampling: u64,
3704 source_type: u64,
3705 requested_facets: Option<&'a [String]>,
3706 selections: Option<&'a Value>,
3707 histogram: Option<&'a str>,
3708 query: Option<&'a str>,
3709}
3710
3711fn normalized_request_echo(input: &RequestEchoInput<'_>) -> Value {
3712 let anchor_usec = match input.anchor {
3713 ExplorerAnchor::Realtime(usec) => usec,
3714 ExplorerAnchor::Auto | ExplorerAnchor::Head | ExplorerAnchor::Tail => 0,
3715 };
3716 let mut out = json!({
3717 "info": input.info,
3718 "slice": true,
3722 "data_only": input.data_only,
3723 "delta": input.delta,
3724 "tail": input.tail,
3725 "sampling": input.sampling,
3726 "source_type": input.source_type,
3727 "after": input.after_realtime_usec.unwrap_or(0) / 1_000_000,
3728 "before": input.before_realtime_usec.unwrap_or(0) / 1_000_000,
3729 "if_modified_since": input.if_modified_since_usec,
3730 "anchor": anchor_usec,
3731 "direction": match input.direction {
3732 Direction::Forward => "forward",
3733 Direction::Backward => "backward",
3734 },
3735 "last": input.limit,
3736 "query": input.query,
3737 "histogram": input.histogram,
3738 });
3739 if let Some(facets) = input.requested_facets {
3740 if let Some(object) = out.as_object_mut() {
3741 object.insert(
3742 "facets".to_string(),
3743 facets
3744 .iter()
3745 .map(|field| Value::String(field.clone()))
3746 .collect(),
3747 );
3748 }
3749 }
3750 if let Some(Value::Object(selections)) = input.selections {
3751 let mut selections = selections.clone();
3752 if let Some(Value::Array(sources)) = selections.get_mut("__logs_sources") {
3753 for source in sources {
3754 *source = Value::Null;
3755 }
3756 }
3757 if let Some(object) = out.as_object_mut() {
3758 object.insert("selections".to_string(), Value::Object(selections));
3759 }
3760 }
3761 out
3762}
3763
3764fn normalize_timestamp_to_usec(value: u64) -> u64 {
3765 normalize_timestamp_to_usec_with_rounding(value, false)
3766}
3767
3768fn normalize_timestamp_to_usec_with_rounding(value: u64, end_of_second: bool) -> u64 {
3769 if value >= 1_000_000_000_000 {
3770 value
3771 } else if end_of_second {
3772 value.saturating_mul(1_000_000).saturating_add(999_999)
3773 } else {
3774 value.saturating_mul(1_000_000)
3775 }
3776}
3777
3778fn unix_now_seconds() -> i64 {
3779 SystemTime::now()
3780 .duration_since(UNIX_EPOCH)
3781 .map(|duration| duration.as_secs() as i64)
3782 .unwrap_or_default()
3783}
3784
3785fn human_binary_size(bytes: u64) -> String {
3786 const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
3787 let mut value = bytes as f64;
3788 let mut unit = 0usize;
3789 while value >= 1024.0 && unit + 1 < UNITS.len() {
3790 value /= 1024.0;
3791 unit += 1;
3792 }
3793 if unit == 0 {
3794 format!("{}{}", bytes, UNITS[unit])
3795 } else if value.fract() == 0.0 {
3796 format!("{value:.0}{}", UNITS[unit])
3797 } else {
3798 let mut formatted = format!("{value:.2}");
3799 while formatted.contains('.') && formatted.ends_with('0') {
3800 formatted.pop();
3801 }
3802 if formatted.ends_with('.') {
3803 formatted.pop();
3804 }
3805 format!("{formatted}{}", UNITS[unit])
3806 }
3807}
3808
3809fn human_duration_seconds(seconds: u64) -> String {
3810 let years = seconds / (365 * 86_400);
3811 let seconds = seconds % (365 * 86_400);
3812 let months = seconds / (30 * 86_400);
3813 let seconds = seconds % (30 * 86_400);
3814 let days = seconds / 86_400;
3815 let seconds = seconds % 86_400;
3816 let hours = seconds / 3600;
3817 let minutes = (seconds % 3600) / 60;
3818 let seconds = seconds % 60;
3819 let mut parts = Vec::new();
3820 if years != 0 {
3821 parts.push(format!("{years}y"));
3822 }
3823 if months != 0 {
3824 parts.push(format!("{months}mo"));
3825 }
3826 if days != 0 {
3827 parts.push(format!("{days}d"));
3828 }
3829 if hours != 0 {
3830 parts.push(format!("{hours}h"));
3831 }
3832 if minutes != 0 {
3833 parts.push(format!("{minutes}m"));
3834 }
3835 if seconds != 0 || parts.is_empty() {
3836 parts.push(format!("{seconds}s"));
3837 }
3838 parts.join(" ")
3839}
3840
3841#[derive(Debug, Default)]
3842struct JournalFileCollection {
3843 files: Vec<PathBuf>,
3844 skipped: u64,
3845 errors: Vec<String>,
3846}
3847
3848fn collect_journal_files(path: &Path) -> Result<JournalFileCollection> {
3849 if !path.is_dir() {
3850 return Err(SdkError::InvalidPath(format!(
3851 "not a directory: {}",
3852 path.display()
3853 )));
3854 }
3855 let mut collection = JournalFileCollection::default();
3856 let mut pending = VecDeque::from([(path.to_path_buf(), 0usize)]);
3857 let mut visited = HashSet::new();
3858 while let Some((directory, depth)) = pending.pop_front() {
3859 let visited_key = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
3860 if visited.contains(&visited_key) {
3861 continue;
3862 }
3863 if visited.len() >= NETDATA_MAX_DIRECTORY_SCAN_COUNT {
3864 collection.skipped = collection.skipped.saturating_add(1);
3865 collection.errors.push(format!(
3866 "{}: directory scan limit reached",
3867 directory.display()
3868 ));
3869 continue;
3870 }
3871 visited.insert(visited_key);
3872 let entries = match std::fs::read_dir(&directory) {
3873 Ok(entries) => entries,
3874 Err(err) if directory == path => return Err(err.into()),
3875 Err(err) => {
3876 collection.skipped = collection.skipped.saturating_add(1);
3877 collection
3878 .errors
3879 .push(format!("{}: {err}", directory.display()));
3880 continue;
3881 }
3882 };
3883 for entry in entries.flatten() {
3884 let entry_path = entry.path();
3885 if entry_path.is_file() && is_journal_file_name(&entry_path) {
3886 collection.files.push(entry_path);
3887 } else if depth < NETDATA_MAX_DIRECTORY_SCAN_DEPTH && entry_path.is_dir() {
3888 pending.push_back((entry_path, depth + 1));
3889 }
3890 }
3891 }
3892 collection.files.sort();
3893 dedup_journal_files_by_canonical_path(&mut collection.files);
3894 Ok(collection)
3895}
3896
3897fn dedup_journal_files_by_canonical_path(files: &mut Vec<PathBuf>) {
3898 let mut seen = HashSet::new();
3899 files.retain(|path| {
3900 let key = std::fs::canonicalize(path).unwrap_or_else(|_| path.clone());
3901 seen.insert(key)
3902 });
3903}
3904
3905#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3906struct JournalFileOrderInfo {
3907 msg_first_realtime_usec: u64,
3908 msg_last_realtime_usec: u64,
3909 file_last_modified_usec: u64,
3910 journal_vs_realtime_delta_usec: u64,
3911}
3912
3913fn journal_file_order_info(
3914 path: &Path,
3915 reader_options: ReaderOptions,
3916 metadata: Option<&NetdataJournalFileMetadata>,
3917) -> JournalFileOrderInfo {
3918 let file_last_modified_usec = std::fs::metadata(path)
3919 .ok()
3920 .and_then(|metadata| metadata.modified().ok())
3921 .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
3922 .map(|duration| duration.as_micros().min(u128::from(u64::MAX)) as u64)
3923 .unwrap_or_default();
3924 let file_last_modified_usec = metadata
3925 .and_then(|metadata| metadata.file_last_modified_usec)
3926 .unwrap_or(file_last_modified_usec);
3927 let journal_vs_realtime_delta_usec = metadata
3928 .and_then(|metadata| metadata.journal_vs_realtime_delta_usec)
3929 .map(normalize_journal_vs_realtime_delta_usec)
3930 .unwrap_or(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
3931
3932 let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
3933 return JournalFileOrderInfo {
3934 msg_first_realtime_usec: 0,
3935 msg_last_realtime_usec: file_last_modified_usec,
3936 file_last_modified_usec,
3937 journal_vs_realtime_delta_usec,
3938 };
3939 };
3940 let header = reader.header();
3941 let msg_first_realtime_usec = metadata
3942 .and_then(|metadata| metadata.msg_first_realtime_usec)
3943 .unwrap_or(header.head_entry_realtime);
3944 let msg_last_realtime_usec = metadata
3945 .and_then(|metadata| metadata.msg_last_realtime_usec)
3946 .unwrap_or_else(|| {
3947 if header.tail_entry_realtime == 0 {
3948 file_last_modified_usec
3949 } else {
3950 header.tail_entry_realtime
3951 }
3952 });
3953 JournalFileOrderInfo {
3954 msg_first_realtime_usec,
3955 msg_last_realtime_usec,
3956 file_last_modified_usec,
3957 journal_vs_realtime_delta_usec,
3958 }
3959}
3960
3961fn compare_journal_file_order(
3962 left: &JournalFileOrderInfo,
3963 right: &JournalFileOrderInfo,
3964 direction: Direction,
3965) -> Ordering {
3966 let backward = right
3967 .msg_last_realtime_usec
3968 .cmp(&left.msg_last_realtime_usec)
3969 .then_with(|| {
3970 right
3971 .file_last_modified_usec
3972 .cmp(&left.file_last_modified_usec)
3973 })
3974 .then_with(|| {
3975 right
3976 .msg_first_realtime_usec
3977 .cmp(&left.msg_first_realtime_usec)
3978 });
3979 match direction {
3980 Direction::Backward => backward,
3981 Direction::Forward => backward.reverse(),
3982 }
3983}
3984
3985fn is_journal_file_name(path: &Path) -> bool {
3986 path.file_name()
3987 .and_then(|name| name.to_str())
3988 .is_some_and(|name| {
3989 name.ends_with(".journal")
3990 || name.ends_with(".journal~")
3991 || name.ends_with(".journal.zst")
3992 || name.ends_with(".journal~.zst")
3993 })
3994}
3995
3996fn push_unique_many(target: &mut Vec<String>, values: &[String]) {
3997 for value in values {
3998 push_unique(target, value);
3999 }
4000}
4001
4002fn string_fields(fields: &[Vec<u8>]) -> Vec<String> {
4003 fields
4004 .iter()
4005 .filter_map(|field| String::from_utf8(field.clone()).ok())
4006 .collect()
4007}
4008
4009fn push_unique(target: &mut Vec<String>, value: impl AsRef<str>) {
4010 let value = value.as_ref();
4011 if !target.iter().any(|existing| existing == value) {
4012 target.push(value.to_string());
4013 }
4014}
4015
4016fn netdata_reorder_key(value: &str) -> String {
4017 value
4018 .trim_start_matches(|character: char| character.is_ascii_punctuation())
4019 .to_ascii_lowercase()
4020}
4021
4022fn histogram_update_every_seconds(histogram: &ExplorerHistogram) -> u64 {
4023 histogram
4024 .buckets
4025 .first()
4026 .map(|bucket| {
4027 bucket
4028 .end_realtime_usec
4029 .saturating_sub(bucket.start_realtime_usec)
4030 .checked_div(1_000_000)
4031 .unwrap_or(1)
4032 .max(1)
4033 })
4034 .unwrap_or(1)
4035}
4036
4037enum TimestampPrecision {
4038 Seconds,
4039 Micros,
4040}
4041
4042fn format_realtime_usec(timestamp: u64, precision: TimestampPrecision) -> String {
4043 let seconds = (timestamp / 1_000_000) as i64;
4044 let micros = (timestamp % 1_000_000) as u32;
4045 DateTime::<Utc>::from_timestamp(seconds, micros.saturating_mul(1000))
4046 .map(|datetime| match precision {
4047 TimestampPrecision::Seconds => datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
4048 TimestampPrecision::Micros => datetime.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string(),
4049 })
4050 .unwrap_or_else(|| timestamp.to_string())
4051}
4052
4053fn priority_name(raw: &str) -> Option<&'static str> {
4054 match parse_priority(raw)? {
4055 0 => Some("panic"),
4056 1 => Some("alert"),
4057 2 => Some("critical"),
4058 3 => Some("error"),
4059 4 => Some("warning"),
4060 5 => Some("notice"),
4061 6 => Some("info"),
4062 7 => Some("debug"),
4063 _ => None,
4064 }
4065}
4066
4067fn priority_name_to_number(value: &str) -> Option<&'static str> {
4068 match value {
4069 "panic" | "emergency" | "emerg" => Some("0"),
4070 "alert" => Some("1"),
4071 "critical" | "crit" => Some("2"),
4072 "error" | "err" => Some("3"),
4073 "warning" | "warn" => Some("4"),
4074 "notice" => Some("5"),
4075 "info" => Some("6"),
4076 "debug" => Some("7"),
4077 _ => None,
4078 }
4079}
4080
4081fn parse_priority(raw: &str) -> Option<u8> {
4082 raw.parse::<u8>().ok()
4083}
4084
4085fn priority_to_row_severity(raw: &[u8]) -> &'static str {
4086 let raw = String::from_utf8_lossy(raw);
4087 match parse_priority(&raw) {
4088 Some(priority) if priority <= 3 => "critical",
4089 Some(4) => "warning",
4090 Some(5) => "notice",
4091 Some(priority) if priority >= 7 => "debug",
4092 _ => "normal",
4093 }
4094}
4095
4096fn syslog_facility_name(raw: &str) -> Option<&'static str> {
4097 match raw.parse::<u8>().ok()? {
4098 0 => Some("kern"),
4099 1 => Some("user"),
4100 2 => Some("mail"),
4101 3 => Some("daemon"),
4102 4 => Some("auth"),
4103 5 => Some("syslog"),
4104 6 => Some("lpr"),
4105 7 => Some("news"),
4106 8 => Some("uucp"),
4107 9 => Some("cron"),
4108 10 => Some("authpriv"),
4109 11 => Some("ftp"),
4110 16 => Some("local0"),
4111 17 => Some("local1"),
4112 18 => Some("local2"),
4113 19 => Some("local3"),
4114 20 => Some("local4"),
4115 21 => Some("local5"),
4116 22 => Some("local6"),
4117 23 => Some("local7"),
4118 _ => None,
4119 }
4120}
4121
4122const ERRNO_NAMES: &[(u32, &str)] = &[
4123 (1, "EPERM"),
4124 (2, "ENOENT"),
4125 (3, "ESRCH"),
4126 (4, "EINTR"),
4127 (5, "EIO"),
4128 (6, "ENXIO"),
4129 (7, "E2BIG"),
4130 (8, "ENOEXEC"),
4131 (9, "EBADF"),
4132 (10, "ECHILD"),
4133 (11, "EAGAIN"),
4134 (12, "ENOMEM"),
4135 (13, "EACCES"),
4136 (14, "EFAULT"),
4137 (15, "ENOTBLK"),
4138 (16, "EBUSY"),
4139 (17, "EEXIST"),
4140 (18, "EXDEV"),
4141 (19, "ENODEV"),
4142 (20, "ENOTDIR"),
4143 (21, "EISDIR"),
4144 (22, "EINVAL"),
4145 (23, "ENFILE"),
4146 (24, "EMFILE"),
4147 (25, "ENOTTY"),
4148 (26, "ETXTBSY"),
4149 (27, "EFBIG"),
4150 (28, "ENOSPC"),
4151 (29, "ESPIPE"),
4152 (30, "EROFS"),
4153 (31, "EMLINK"),
4154 (32, "EPIPE"),
4155 (33, "EDOM"),
4156 (34, "ERANGE"),
4157 (35, "EDEADLK"),
4158 (36, "ENAMETOOLONG"),
4159 (37, "ENOLCK"),
4160 (38, "ENOSYS"),
4161 (39, "ENOTEMPTY"),
4162 (40, "ELOOP"),
4163 (42, "ENOMSG"),
4164 (43, "EIDRM"),
4165 (44, "ECHRNG"),
4166 (45, "EL2NSYNC"),
4167 (46, "EL3HLT"),
4168 (47, "EL3RST"),
4169 (48, "ELNRNG"),
4170 (49, "EUNATCH"),
4171 (50, "ENOCSI"),
4172 (51, "EL2HLT"),
4173 (52, "EBADE"),
4174 (53, "EBADR"),
4175 (54, "EXFULL"),
4176 (55, "ENOANO"),
4177 (56, "EBADRQC"),
4178 (57, "EBADSLT"),
4179 (59, "EBFONT"),
4180 (60, "ENOSTR"),
4181 (61, "ENODATA"),
4182 (62, "ETIME"),
4183 (63, "ENOSR"),
4184 (64, "ENONET"),
4185 (65, "ENOPKG"),
4186 (66, "EREMOTE"),
4187 (67, "ENOLINK"),
4188 (68, "EADV"),
4189 (69, "ESRMNT"),
4190 (70, "ECOMM"),
4191 (71, "EPROTO"),
4192 (72, "EMULTIHOP"),
4193 (73, "EDOTDOT"),
4194 (74, "EBADMSG"),
4195 (75, "EOVERFLOW"),
4196 (76, "ENOTUNIQ"),
4197 (77, "EBADFD"),
4198 (78, "EREMCHG"),
4199 (79, "ELIBACC"),
4200 (80, "ELIBBAD"),
4201 (81, "ELIBSCN"),
4202 (82, "ELIBMAX"),
4203 (83, "ELIBEXEC"),
4204 (84, "EILSEQ"),
4205 (85, "ERESTART"),
4206 (86, "ESTRPIPE"),
4207 (87, "EUSERS"),
4208 (88, "ENOTSOCK"),
4209 (89, "EDESTADDRREQ"),
4210 (90, "EMSGSIZE"),
4211 (91, "EPROTOTYPE"),
4212 (92, "ENOPROTOOPT"),
4213 (93, "EPROTONOSUPPORT"),
4214 (94, "ESOCKTNOSUPPORT"),
4215 (95, "ENOTSUP"),
4216 (96, "EPFNOSUPPORT"),
4217 (97, "EAFNOSUPPORT"),
4218 (98, "EADDRINUSE"),
4219 (99, "EADDRNOTAVAIL"),
4220 (100, "ENETDOWN"),
4221 (101, "ENETUNREACH"),
4222 (102, "ENETRESET"),
4223 (103, "ECONNABORTED"),
4224 (104, "ECONNRESET"),
4225 (105, "ENOBUFS"),
4226 (106, "EISCONN"),
4227 (107, "ENOTCONN"),
4228 (108, "ESHUTDOWN"),
4229 (109, "ETOOMANYREFS"),
4230 (110, "ETIMEDOUT"),
4231 (111, "ECONNREFUSED"),
4232 (112, "EHOSTDOWN"),
4233 (113, "EHOSTUNREACH"),
4234 (114, "EALREADY"),
4235 (115, "EINPROGRESS"),
4236 (116, "ESTALE"),
4237 (117, "EUCLEAN"),
4238 (118, "ENOTNAM"),
4239 (119, "ENAVAIL"),
4240 (120, "EISNAM"),
4241 (121, "EREMOTEIO"),
4242 (122, "EDQUOT"),
4243 (123, "ENOMEDIUM"),
4244 (124, "EMEDIUMTYPE"),
4245 (125, "ECANCELED"),
4246 (126, "ENOKEY"),
4247 (127, "EKEYEXPIRED"),
4248 (128, "EKEYREVOKED"),
4249 (129, "EKEYREJECTED"),
4250 (130, "EOWNERDEAD"),
4251 (131, "ENOTRECOVERABLE"),
4252 (132, "ERFKILL"),
4253 (133, "EHWPOISON"),
4254];
4255
4256fn errno_name(raw: &str) -> Option<String> {
4257 let errno = raw.parse::<u32>().ok()?;
4258 let name = ERRNO_NAMES
4259 .iter()
4260 .find_map(|(candidate, name)| (*candidate == errno).then_some(*name))?;
4261 Some(format!("{errno} ({name})"))
4262}
4263
4264fn cap_effective_display(raw: &str) -> String {
4265 if !raw.bytes().next().is_some_and(|byte| byte.is_ascii_digit()) {
4266 return raw.to_string();
4267 }
4268 let Ok(value) = u64::from_str_radix(raw, 16) else {
4269 return raw.to_string();
4270 };
4271 if value == 0 {
4272 return raw.to_string();
4273 }
4274 const CAPABILITIES: &[&str] = &[
4275 "CHOWN",
4276 "DAC_OVERRIDE",
4277 "DAC_READ_SEARCH",
4278 "FOWNER",
4279 "FSETID",
4280 "KILL",
4281 "SETGID",
4282 "SETUID",
4283 "SETPCAP",
4284 "LINUX_IMMUTABLE",
4285 "NET_BIND_SERVICE",
4286 "NET_BROADCAST",
4287 "NET_ADMIN",
4288 "NET_RAW",
4289 "IPC_LOCK",
4290 "IPC_OWNER",
4291 "SYS_MODULE",
4292 "SYS_RAWIO",
4293 "SYS_CHROOT",
4294 "SYS_PTRACE",
4295 "SYS_PACCT",
4296 "SYS_ADMIN",
4297 "SYS_BOOT",
4298 "SYS_NICE",
4299 "SYS_RESOURCE",
4300 "SYS_TIME",
4301 "SYS_TTY_CONFIG",
4302 "MKNOD",
4303 "LEASE",
4304 "AUDIT_WRITE",
4305 "AUDIT_CONTROL",
4306 "SETFCAP",
4307 "MAC_OVERRIDE",
4308 "MAC_ADMIN",
4309 "SYSLOG",
4310 "WAKE_ALARM",
4311 "BLOCK_SUSPEND",
4312 "AUDIT_READ",
4313 "PERFMON",
4314 "BPF",
4315 "CHECKPOINT_RESTORE",
4316 ];
4317 let names: Vec<&str> = CAPABILITIES
4318 .iter()
4319 .enumerate()
4320 .filter_map(|(index, name)| ((value & (1u64 << index)) != 0).then_some(*name))
4321 .collect();
4322 if names.is_empty() {
4323 raw.to_string()
4324 } else {
4325 format!("{raw} ({})", names.join(" | "))
4326 }
4327}
4328
4329fn systemd_field_display_value(
4330 context: &DisplayContext,
4331 scope: DisplayScope,
4332 field: &str,
4333 value: &[u8],
4334 resolve_user_group_names: bool,
4335) -> Value {
4336 let raw = String::from_utf8_lossy(value);
4337 match field {
4338 "PRIORITY" => Value::String(priority_name(&raw).unwrap_or(&raw).to_string()),
4339 "SYSLOG_FACILITY" => Value::String(syslog_facility_name(&raw).unwrap_or(&raw).to_string()),
4340 "ERRNO" => Value::String(errno_name(&raw).unwrap_or_else(|| raw.to_string())),
4341 "MESSAGE_ID" => Value::String(match (message_id_name(&raw), scope) {
4342 (Some(name), DisplayScope::Data) => format!("{raw} ({name})"),
4343 (Some(name), DisplayScope::Facet | DisplayScope::Histogram) => name.to_string(),
4344 (None, _) => raw.into_owned(),
4345 }),
4346 "_BOOT_ID" => Value::String(match (context.boot_first_realtime.get(value), scope) {
4347 (Some(timestamp), DisplayScope::Data) => format!(
4348 "{} ({}) ",
4349 raw,
4350 format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4351 ),
4352 (Some(timestamp), DisplayScope::Facet | DisplayScope::Histogram) => {
4353 format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4354 }
4355 (None, _) => raw.into_owned(),
4356 }),
4357 "_UID"
4358 | "_SYSTEMD_OWNER_UID"
4359 | "OBJECT_SYSTEMD_OWNER_UID"
4360 | "OBJECT_UID"
4361 | "_AUDIT_LOGINUID"
4362 | "OBJECT_AUDIT_LOGINUID" => {
4363 if resolve_user_group_names {
4364 Value::String(cached_uid_display(context, raw.as_ref()))
4365 } else {
4366 Value::String(raw.into_owned())
4367 }
4368 }
4369 "_GID" | "OBJECT_GID" => {
4370 if resolve_user_group_names {
4371 Value::String(cached_gid_display(context, raw.as_ref()))
4372 } else {
4373 Value::String(raw.into_owned())
4374 }
4375 }
4376 "_CAP_EFFECTIVE" => Value::String(cap_effective_display(&raw)),
4377 "_SOURCE_REALTIME_TIMESTAMP" => Value::String(match raw.parse::<u64>() {
4378 Ok(timestamp) if timestamp != 0 => {
4379 format!(
4380 "{} ({})",
4381 raw,
4382 format_realtime_usec(timestamp, TimestampPrecision::Micros)
4383 )
4384 }
4385 _ => raw.into_owned(),
4386 }),
4387 _ => Value::String(raw.into_owned()),
4388 }
4389}
4390
4391fn cached_uid_display(context: &DisplayContext, raw: &str) -> String {
4392 if let Some(value) = context.uid_display_cache.borrow().get(raw) {
4393 return value.clone();
4394 }
4395 let value = resolve_uid_name(raw).unwrap_or_else(|| raw.to_string());
4396 context
4397 .uid_display_cache
4398 .borrow_mut()
4399 .insert(raw.to_string(), value.clone());
4400 value
4401}
4402
4403fn cached_gid_display(context: &DisplayContext, raw: &str) -> String {
4404 if let Some(value) = context.gid_display_cache.borrow().get(raw) {
4405 return value.clone();
4406 }
4407 let value = resolve_gid_name(raw).unwrap_or_else(|| raw.to_string());
4408 context
4409 .gid_display_cache
4410 .borrow_mut()
4411 .insert(raw.to_string(), value.clone());
4412 value
4413}
4414
4415#[cfg(unix)]
4416fn resolve_uid_name(raw: &str) -> Option<String> {
4417 let uid = raw.parse::<libc::uid_t>().ok()?;
4418 let mut pwd = std::mem::MaybeUninit::<libc::passwd>::uninit();
4419 let mut result = std::ptr::null_mut();
4420 let mut buffer = vec![0i8; 16_384];
4421 let rc = unsafe {
4422 libc::getpwuid_r(
4423 uid,
4424 pwd.as_mut_ptr(),
4425 buffer.as_mut_ptr(),
4426 buffer.len(),
4427 &mut result,
4428 )
4429 };
4430 if rc != 0 || result.is_null() {
4431 return None;
4432 }
4433 let pwd = unsafe { pwd.assume_init() };
4434 Some(
4435 unsafe { CStr::from_ptr(pwd.pw_name) }
4436 .to_string_lossy()
4437 .into_owned(),
4438 )
4439}
4440
4441#[cfg(not(unix))]
4442fn resolve_uid_name(_raw: &str) -> Option<String> {
4443 None
4444}
4445
4446#[cfg(unix)]
4447fn resolve_gid_name(raw: &str) -> Option<String> {
4448 let gid = raw.parse::<libc::gid_t>().ok()?;
4449 let mut grp = std::mem::MaybeUninit::<libc::group>::uninit();
4450 let mut result = std::ptr::null_mut();
4451 let mut buffer = vec![0i8; 16_384];
4452 let rc = unsafe {
4453 libc::getgrgid_r(
4454 gid,
4455 grp.as_mut_ptr(),
4456 buffer.as_mut_ptr(),
4457 buffer.len(),
4458 &mut result,
4459 )
4460 };
4461 if rc != 0 || result.is_null() {
4462 return None;
4463 }
4464 let grp = unsafe { grp.assume_init() };
4465 Some(
4466 unsafe { CStr::from_ptr(grp.gr_name) }
4467 .to_string_lossy()
4468 .into_owned(),
4469 )
4470}
4471
4472#[cfg(not(unix))]
4473fn resolve_gid_name(_raw: &str) -> Option<String> {
4474 None
4475}
4476
4477const MESSAGE_ID_NAMES: &[(&str, &str)] = &[
4478 ("f77379a8490b408bbe5f6940505a777b", "Journal started"),
4479 ("d93fb3c9c24d451a97cea615ce59c00b", "Journal stopped"),
4480 (
4481 "a596d6fe7bfa4994828e72309e95d61e",
4482 "Journal messages suppressed",
4483 ),
4484 (
4485 "e9bf28e6e834481bb6f48f548ad13606",
4486 "Journal messages missed",
4487 ),
4488 (
4489 "ec387f577b844b8fa948f33cad9a75e6",
4490 "Journal disk space usage",
4491 ),
4492 ("fc2e22bc6ee647b6b90729ab34a250b1", "Coredump"),
4493 ("5aadd8e954dc4b1a8c954d63fd9e1137", "Coredump truncated"),
4494 ("1f4e0a44a88649939aaea34fc6da8c95", "Backtrace"),
4495 ("8d45620c1a4348dbb17410da57c60c66", "User Session created"),
4496 (
4497 "3354939424b4456d9802ca8333ed424a",
4498 "User Session terminated",
4499 ),
4500 ("fcbefc5da23d428093f97c82a9290f7b", "Seat started"),
4501 ("e7852bfe46784ed0accde04bc864c2d5", "Seat removed"),
4502 (
4503 "24d8d4452573402496068381a6312df2",
4504 "VM or container started",
4505 ),
4506 (
4507 "58432bd3bace477cb514b56381b8a758",
4508 "VM or container stopped",
4509 ),
4510 ("c7a787079b354eaaa9e77b371893cd27", "Time change"),
4511 ("45f82f4aef7a4bbf942ce861d1f20990", "Timezone change"),
4512 (
4513 "50876a9db00f4c40bde1a2ad381c3a1b",
4514 "System configuration issues",
4515 ),
4516 (
4517 "b07a249cd024414a82dd00cd181378ff",
4518 "System start-up completed",
4519 ),
4520 (
4521 "eed00a68ffd84e31882105fd973abdd1",
4522 "User start-up completed",
4523 ),
4524 ("6bbd95ee977941e497c48be27c254128", "Sleep start"),
4525 ("8811e6df2a8e40f58a94cea26f8ebf14", "Sleep stop"),
4526 (
4527 "98268866d1d54a499c4e98921d93bc40",
4528 "System shutdown initiated",
4529 ),
4530 (
4531 "c14aaf76ec284a5fa1f105f88dfb061c",
4532 "System factory reset initiated",
4533 ),
4534 ("d9ec5e95e4b646aaaea2fd05214edbda", "Container init crashed"),
4535 (
4536 "3ed0163e868a4417ab8b9e210407a96c",
4537 "System reboot failed after crash",
4538 ),
4539 ("645c735537634ae0a32b15a7c6cba7d4", "Init execution froze"),
4540 (
4541 "5addb3a06a734d3396b794bf98fb2d01",
4542 "Init crashed no coredump",
4543 ),
4544 ("5c9e98de4ab94c6a9d04d0ad793bd903", "Init crashed no fork"),
4545 (
4546 "5e6f1f5e4db64a0eaee3368249d20b94",
4547 "Init crashed unknown signal",
4548 ),
4549 (
4550 "83f84b35ee264f74a3896a9717af34cb",
4551 "Init crashed systemd signal",
4552 ),
4553 (
4554 "3a73a98baf5b4b199929e3226c0be783",
4555 "Init crashed process signal",
4556 ),
4557 (
4558 "2ed18d4f78ca47f0a9bc25271c26adb4",
4559 "Init crashed waitpid failed",
4560 ),
4561 (
4562 "56b1cd96f24246c5b607666fda952356",
4563 "Init crashed coredump failed",
4564 ),
4565 ("4ac7566d4d7548f4981f629a28f0f829", "Init crashed coredump"),
4566 (
4567 "38e8b1e039ad469291b18b44c553a5b7",
4568 "Crash shell failed to fork",
4569 ),
4570 (
4571 "872729b47dbe473eb768ccecd477beda",
4572 "Crash shell failed to execute",
4573 ),
4574 ("658a67adc1c940b3b3316e7e8628834a", "Selinux failed"),
4575 ("e6f456bd92004d9580160b2207555186", "Battery low warning"),
4576 (
4577 "267437d33fdd41099ad76221cc24a335",
4578 "Battery low powering off",
4579 ),
4580 (
4581 "79e05b67bc4545d1922fe47107ee60c5",
4582 "Manager mainloop failed",
4583 ),
4584 ("dbb136b10ef4457ba47a795d62f108c9", "Manager no xdgdir path"),
4585 (
4586 "ed158c2df8884fa584eead2d902c1032",
4587 "Init failed to drop capability bounding set of usermode",
4588 ),
4589 (
4590 "42695b500df048298bee37159caa9f2e",
4591 "Init failed to drop capability bounding set",
4592 ),
4593 (
4594 "bfc2430724ab44499735b4f94cca9295",
4595 "User manager can't disable new privileges",
4596 ),
4597 (
4598 "59288af523be43a28d494e41e26e4510",
4599 "Manager failed to start default target",
4600 ),
4601 (
4602 "689b4fcc97b4486ea5da92db69c9e314",
4603 "Manager failed to isolate default target",
4604 ),
4605 (
4606 "5ed836f1766f4a8a9fc5da45aae23b29",
4607 "Manager failed to collect passed file descriptors",
4608 ),
4609 (
4610 "6a40fbfbd2ba4b8db02fb40c9cd090d7",
4611 "Init failed to fix up environment variables",
4612 ),
4613 (
4614 "0e54470984ac419689743d957a119e2e",
4615 "Manager failed to allocate",
4616 ),
4617 (
4618 "d67fa9f847aa4b048a2ae33535331adb",
4619 "Manager failed to write Smack",
4620 ),
4621 (
4622 "af55a6f75b544431b72649f36ff6d62c",
4623 "System shutdown critical error",
4624 ),
4625 (
4626 "d18e0339efb24a068d9c1060221048c2",
4627 "Init failed to fork off valgrind",
4628 ),
4629 ("7d4958e842da4a758f6c1cdc7b36dcc5", "Unit starting"),
4630 ("39f53479d3a045ac8e11786248231fbf", "Unit started"),
4631 ("be02cf6855d2428ba40df7e9d022f03d", "Unit failed"),
4632 ("de5b426a63be47a7b6ac3eaac82e2f6f", "Unit stopping"),
4633 ("9d1aaa27d60140bd96365438aad20286", "Unit stopped"),
4634 ("d34d037fff1847e6ae669a370e694725", "Unit reloading"),
4635 ("7b05ebc668384222baa8881179cfda54", "Unit reloaded"),
4636 ("5eb03494b6584870a536b337290809b3", "Unit restart scheduled"),
4637 ("ae8f7b866b0347b9af31fe1c80b127c0", "Unit resources"),
4638 ("7ad2d189f7e94e70a38c781354912448", "Unit success"),
4639 ("0e4284a0caca4bfc81c0bb6786972673", "Unit skipped"),
4640 ("d9b373ed55a64feb8242e02dbe79a49c", "Unit failure result"),
4641 (
4642 "641257651c1b4ec9a8624d7a40a9e1e7",
4643 "Process execution failed",
4644 ),
4645 ("98e322203f7a4ed290d09fe03c09fe15", "Unit process exited"),
4646 ("0027229ca0644181a76c4e92458afa2e", "Syslog forward missed"),
4647 (
4648 "1dee0369c7fc4736b7099b38ecb46ee7",
4649 "Mount point is not empty",
4650 ),
4651 ("d989611b15e44c9dbf31e3c81256e4ed", "Unit oomd kill"),
4652 ("fe6faa94e7774663a0da52717891d8ef", "Unit out of memory"),
4653 ("b72ea4a2881545a0b50e200e55b9b06f", "Lid opened"),
4654 ("b72ea4a2881545a0b50e200e55b9b070", "Lid closed"),
4655 ("f5f416b862074b28927a48c3ba7d51ff", "System docked"),
4656 ("51e171bd585248568110144c517cca53", "System undocked"),
4657 ("b72ea4a2881545a0b50e200e55b9b071", "Power key"),
4658 ("3e0117101eb243c1b9a50db3494ab10b", "Power key long press"),
4659 ("9fa9d2c012134ec385451ffe316f97d0", "Reboot key"),
4660 ("f1c59a58c9d943668965c337caec5975", "Reboot key long press"),
4661 ("b72ea4a2881545a0b50e200e55b9b072", "Suspend key"),
4662 ("bfdaf6d312ab4007bc1fe40a15df78e8", "Suspend key long press"),
4663 ("b72ea4a2881545a0b50e200e55b9b073", "Hibernate key"),
4664 (
4665 "167836df6f7f428e98147227b2dc8945",
4666 "Hibernate key long press",
4667 ),
4668 ("c772d24e9a884cbeb9ea12625c306c01", "Invalid configuration"),
4669 (
4670 "1675d7f172174098b1108bf8c7dc8f5d",
4671 "DNSSEC validation failed",
4672 ),
4673 (
4674 "4d4408cfd0d144859184d1e65d7c8a65",
4675 "DNSSEC trust anchor revoked",
4676 ),
4677 ("36db2dfa5a9045e1bd4af5f93e1cf057", "DNSSEC turned off"),
4678 ("b61fdac612e94b9182285b998843061f", "Username unsafe"),
4679 (
4680 "1b3bb94037f04bbf81028e135a12d293",
4681 "Mount point path not suitable",
4682 ),
4683 (
4684 "010190138f494e29a0ef6669749531aa",
4685 "Device path not suitable",
4686 ),
4687 ("b480325f9c394a7b802c231e51a2752c", "Nobody user unsuitable"),
4688 (
4689 "1c0454c1bd2241e0ac6fefb4bc631433",
4690 "Systemd udev settle deprecated",
4691 ),
4692 ("7c8a41f37b764941a0e1780b1be2f037", "Time initial sync"),
4693 ("7db73c8af0d94eeb822ae04323fe6ab6", "Time initial bump"),
4694 ("9e7066279dc8403da79ce4b1a69064b2", "Shutdown scheduled"),
4695 ("249f6fb9e6e2428c96f3f0875681ffa3", "Shutdown canceled"),
4696 ("3f7d5ef3e54f4302b4f0b143bb270cab", "TPM PCR Extended"),
4697 ("f9b0be465ad540d0850ad32172d57c21", "Memory Trimmed"),
4698 ("a8fa8dacdb1d443e9503b8be367a6adb", "SysV Service Found"),
4699 (
4700 "187c62eb1e7f463bb530394f52cb090f",
4701 "Portable Service attached",
4702 ),
4703 (
4704 "76c5c754d628490d8ecba4c9d042112b",
4705 "Portable Service detached",
4706 ),
4707 (
4708 "9cf56b8baf9546cf9478783a8de42113",
4709 "systemd-networkd sysctl changed by foreign process",
4710 ),
4711 (
4712 "ad7089f928ac4f7ea00c07457d47ba8a",
4713 "SRK into TPM authorization failure",
4714 ),
4715 (
4716 "b2bcbaf5edf948e093ce50bbea0e81ec",
4717 "Secure Attention Key (SAK) was pressed",
4718 ),
4719 ("7fc63312330b479bb32e598d47cef1a8", "dbus activate no unit"),
4720 (
4721 "ee9799dab1e24d81b7bee7759a543e1b",
4722 "dbus activate masked unit",
4723 ),
4724 ("a0fa58cafd6f4f0c8d003d16ccf9e797", "dbus broker exited"),
4725 ("c8c6cde1c488439aba371a664353d9d8", "dbus dirwatch"),
4726 ("8af3357071af4153af414daae07d38e7", "dbus dispatch stats"),
4727 ("199d4300277f495f84ba4028c984214c", "dbus no sopeergroup"),
4728 (
4729 "b209c0d9d1764ab38d13b8e00d1784d6",
4730 "dbus protocol violation",
4731 ),
4732 ("6fa70fa776044fa28be7a21daf42a108", "dbus receive failed"),
4733 (
4734 "0ce0fa61d1a9433dabd67417f6b8e535",
4735 "dbus service failed open",
4736 ),
4737 ("24dc708d9e6a4226a3efe2033bb744de", "dbus service invalid"),
4738 ("f15d2347662d483ea9bcd8aa1a691d28", "dbus sighup"),
4739 (
4740 "0ce153587afa4095832d233c17a88001",
4741 "Gnome SM startup succeeded",
4742 ),
4743 (
4744 "10dd2dc188b54a5e98970f56499d1f73",
4745 "Gnome SM unrecoverable failure",
4746 ),
4747 ("f3ea493c22934e26811cd62abe8e203a", "Gnome shell started"),
4748 ("c7b39b1e006b464599465e105b361485", "Flatpak cache"),
4749 ("75ba3deb0af041a9a46272ff85d9e73e", "Flathub pulls"),
4750 ("f02bce89a54e4efab3a94a797d26204a", "Flathub pull errors"),
4751 ("dd11929c788e48bdbb6276fb5f26b08a", "Boltd starting"),
4752 ("1e6061a9fbd44501b3ccc368119f2b69", "Netdata startup"),
4753 (
4754 "ed4cdb8f1beb4ad3b57cb3cae2d162fa",
4755 "Netdata connection from child",
4756 ),
4757 (
4758 "6e2e3839067648968b646045dbf28d66",
4759 "Netdata connection to parent",
4760 ),
4761 (
4762 "9ce0cb58ab8b44df82c4bf1ad9ee22de",
4763 "Netdata alert transition",
4764 ),
4765 (
4766 "6db0018e83e34320ae2a659d78019fb7",
4767 "Netdata alert notification",
4768 ),
4769 ("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"),
4770 (
4771 "8ddaf5ba33a74078b609250db1e951f3",
4772 "Sensor state transition",
4773 ),
4774 (
4775 "ec87a56120d5431bace51e2fb8bba243",
4776 "Netdata log flood protection",
4777 ),
4778 (
4779 "acb33cb95778476baac702eb7e4e151d",
4780 "Netdata Cloud connection",
4781 ),
4782 (
4783 "d1f59606dd4d41e3b217a0cfcae8e632",
4784 "Netdata extreme cardinality",
4785 ),
4786 ("02f47d350af5449197bf7a95b605a468", "Netdata exit reason"),
4787 (
4788 "4fdf40816c124623a032b7fe73beacb8",
4789 "Netdata dynamic configuration",
4790 ),
4791];
4792
4793fn message_id_name(raw: &str) -> Option<&'static str> {
4794 MESSAGE_ID_NAMES
4795 .iter()
4796 .find_map(|(candidate, name)| (*candidate == raw).then_some(*name))
4797}
4798
4799#[cfg(test)]
4800mod tests {
4801 use super::*;
4802 use crate::ExplorerHistogramBucket;
4803 use journal_core::file::{JournalFile, JournalFileOptions, JournalWriter, MmapMut};
4804 use journal_core::repository::File as RepoFile;
4805 use std::cell::Cell;
4806 use std::collections::HashMap;
4807 use tempfile::TempDir;
4808
4809 #[derive(Default)]
4810 struct TestNetdataState {
4811 metadata: HashMap<PathBuf, NetdataJournalFileMetadata>,
4812 updates: Vec<(PathBuf, u64)>,
4813 }
4814
4815 impl NetdataFunctionState for TestNetdataState {
4816 fn file_metadata(&self, path: &Path) -> Option<NetdataJournalFileMetadata> {
4817 self.metadata.get(path).cloned()
4818 }
4819
4820 fn update_file_journal_vs_realtime_delta_usec(&mut self, path: &Path, delta_usec: u64) {
4821 self.updates.push((path.to_path_buf(), delta_usec));
4822 }
4823 }
4824
4825 fn test_uuid(seed: u8) -> uuid::Uuid {
4826 uuid::Uuid::from_bytes([seed; 16])
4827 }
4828
4829 fn write_netdata_test_journal(directory: &std::path::Path, count: usize) {
4830 write_named_netdata_test_journal(
4831 directory,
4832 "netdata-api-test.journal",
4833 count,
4834 1_700_000_000_000_000,
4835 );
4836 }
4837
4838 fn write_named_netdata_test_journal(
4839 directory: &std::path::Path,
4840 name: &str,
4841 count: usize,
4842 start_realtime_usec: u64,
4843 ) {
4844 write_stepped_netdata_test_journal(directory, name, count, start_realtime_usec, 1);
4845 }
4846
4847 fn write_stepped_netdata_test_journal(
4848 directory: &std::path::Path,
4849 name: &str,
4850 count: usize,
4851 start_realtime_usec: u64,
4852 step_realtime_usec: u64,
4853 ) {
4854 std::fs::create_dir_all(directory).expect("create journal dir");
4855 let path = directory.join(name);
4856 let repo_file = RepoFile::from_path(&path).expect("repo file");
4857 let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
4858 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
4859 let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
4860 for index in 0..count {
4861 let message = format!("MESSAGE=row-{index}");
4862 let service = if index % 2 == 0 {
4863 b"SERVICE=even".as_slice()
4864 } else {
4865 b"SERVICE=odd".as_slice()
4866 };
4867 let payloads: [&[u8]; 3] = [message.as_bytes(), service, b"PRIORITY=6"];
4868 let realtime = start_realtime_usec
4869 .saturating_add((index as u64).saturating_mul(step_realtime_usec));
4870 writer
4871 .add_entry(&mut file, &payloads, realtime, realtime)
4872 .expect("write entry");
4873 }
4874 file.sync().expect("sync journal");
4875 }
4876
4877 struct NetdataCollectedPages {
4878 messages: Vec<String>,
4879 timestamps: Vec<u64>,
4880 }
4881
4882 fn collect_netdata_pages(
4883 directory: &std::path::Path,
4884 direction: Direction,
4885 page_size: usize,
4886 ) -> NetdataCollectedPages {
4887 let mut out = NetdataCollectedPages {
4888 messages: Vec::new(),
4889 timestamps: Vec::new(),
4890 };
4891 let mut anchor = None;
4892 for page in 0..100 {
4893 let mut request = json!({
4894 "after": 1_700_000_000,
4895 "before": 1_800_000_000,
4896 "last": page_size,
4897 "direction": direction_name(direction),
4898 "data_only": true,
4899 });
4900 if let Some(anchor) = anchor {
4901 request["anchor"] = Value::from(anchor);
4902 }
4903 let response = run_netdata_contract_request(directory, request);
4904 assert_eq!(
4905 response["status"], 200,
4906 "page {page} response = {response:#}"
4907 );
4908 let messages = response_column_strings(&response, "MESSAGE");
4909 let timestamps = response_column_u64s(&response, "timestamp");
4910 assert_eq!(messages.len(), timestamps.len());
4911 if messages.is_empty() {
4912 break;
4913 }
4914 out.messages.extend(messages);
4915 out.timestamps.extend(timestamps.iter().copied());
4916 anchor = Some(match direction {
4917 Direction::Forward => timestamps[0],
4918 Direction::Backward => *timestamps.last().expect("page timestamp"),
4919 });
4920 if timestamps.len() < page_size {
4921 break;
4922 }
4923 }
4924 assert!(!out.messages.is_empty(), "pagination returned no rows");
4925 out
4926 }
4927
4928 fn run_netdata_contract_request(directory: &std::path::Path, request: Value) -> Value {
4929 NetdataJournalFunction::systemd_journal_plugin_compatible()
4930 .run_directory_request_json_with_options(
4931 directory,
4932 &request,
4933 NetdataFunctionRunOptions::from_timeout_seconds(0),
4934 )
4935 .expect("run function")
4936 }
4937
4938 fn direction_name(direction: Direction) -> &'static str {
4939 match direction {
4940 Direction::Forward => "forward",
4941 Direction::Backward => "backward",
4942 }
4943 }
4944
4945 fn response_column_strings(response: &Value, field: &str) -> Vec<String> {
4946 let index = response_column_index(response, field);
4947 response["data"]
4948 .as_array()
4949 .expect("data")
4950 .iter()
4951 .map(|row| {
4952 row.as_array().expect("row")[index]
4953 .as_str()
4954 .expect("string cell")
4955 .to_string()
4956 })
4957 .collect()
4958 }
4959
4960 fn response_column_u64s(response: &Value, field: &str) -> Vec<u64> {
4961 let index = response_column_index(response, field);
4962 response["data"]
4963 .as_array()
4964 .expect("data")
4965 .iter()
4966 .map(|row| {
4967 row.as_array().expect("row")[index]
4968 .as_u64()
4969 .expect("u64 cell")
4970 })
4971 .collect()
4972 }
4973
4974 fn response_column_index(response: &Value, field: &str) -> usize {
4975 response["columns"][field]["index"]
4976 .as_u64()
4977 .expect("column index") as usize
4978 }
4979
4980 fn response_facet_count(response: &Value, key: &str, field: &str, value: &str) -> u64 {
4981 for facet in response[key].as_array().expect("facets") {
4982 if facet["id"] != field {
4983 continue;
4984 }
4985 for option in facet["options"].as_array().expect("facet options") {
4986 if option["id"] == value {
4987 return option["count"].as_u64().expect("facet count");
4988 }
4989 }
4990 panic!("{key} facet {field} missing value {value}");
4991 }
4992 panic!("{key} missing facet {field}");
4993 }
4994
4995 fn response_histogram_total(response: &Value, key: &str, value: &str) -> u64 {
4996 let chart = &response[key]["chart"];
4997 let names = chart["view"]["dimensions"]["names"]
4998 .as_array()
4999 .expect("histogram names");
5000 let dimension_index = names
5001 .iter()
5002 .position(|name| name.as_str() == Some(value))
5003 .expect("histogram dimension");
5004 chart["result"]["data"]
5005 .as_array()
5006 .expect("histogram data")
5007 .iter()
5008 .map(|point| {
5009 point.as_array().expect("histogram point")[dimension_index + 1]
5010 .as_array()
5011 .expect("histogram cell")[0]
5012 .as_u64()
5013 .unwrap_or_default()
5014 })
5015 .sum()
5016 }
5017
5018 fn assert_unique_messages(messages: &[String]) {
5019 let mut seen = HashSet::new();
5020 for message in messages {
5021 assert!(
5022 seen.insert(message),
5023 "duplicate message {message} in {messages:?}"
5024 );
5025 }
5026 }
5027
5028 #[test]
5029 fn parses_netdata_selections_as_and_fields_or_values() {
5030 let request = json!({
5031 "after": 200_000_000,
5032 "before": 200_000_100,
5033 "direction": "forward",
5034 "last": 25,
5035 "facets": ["PRIORITY"],
5036 "selections": {
5037 "PRIORITY": ["warning", "error"],
5038 "_HOSTNAME": ["node-a"],
5039 "__logs_sources": ["all-local-system-logs"],
5040 }
5041 });
5042
5043 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5044 .expect("parse request");
5045 assert_eq!(parsed.after_realtime_usec, Some(200_000_000_000_000));
5046 assert_eq!(parsed.before_realtime_usec, Some(200_000_100_999_999));
5047 assert_eq!(parsed.direction, Direction::Forward);
5048 assert_eq!(parsed.limit, 25);
5049 assert_eq!(parsed.filters.len(), 2);
5050 assert_eq!(parsed.filters[0].field, b"PRIORITY");
5051 assert_eq!(parsed.filters[0].values, vec![b"4".to_vec(), b"3".to_vec()]);
5052 assert_eq!(parsed.filters[1].field, b"_HOSTNAME");
5053 assert_eq!(parsed.filters[1].values, vec![b"node-a".to_vec()]);
5054 }
5055
5056 #[test]
5057 fn netdata_last_one_keeps_echo_and_uses_effective_minimum_two() {
5058 let request = json!({
5059 "after": 200_000_000,
5060 "before": 200_000_100,
5061 "last": 1
5062 });
5063
5064 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5065 .expect("parse request");
5066 let query =
5067 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5068
5069 assert_eq!(parsed.echo.get("last").and_then(Value::as_u64), Some(1));
5070 assert_eq!(parsed.limit, 2);
5071 assert_eq!(query.limit, 2);
5072 }
5073
5074 #[test]
5075 fn netdata_facet_counts_use_native_sliced_filter_semantics() {
5076 let request = json!({
5077 "after": 200_000_000,
5078 "before": 200_000_100,
5079 "facets": ["PRIORITY"],
5080 "selections": {
5081 "PRIORITY": ["warning"]
5082 }
5083 });
5084
5085 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5086 .expect("parse request");
5087 let query =
5088 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5089
5090 assert!(!query.exclude_facet_field_filters);
5091 }
5092
5093 #[test]
5094 fn netdata_multi_filter_facet_counts_exclude_same_field_filter() {
5095 let request = json!({
5096 "after": 200_000_000,
5097 "before": 200_000_100,
5098 "facets": ["PRIORITY", "_BOOT_ID"],
5099 "selections": {
5100 "PRIORITY": ["warning"],
5101 "_BOOT_ID": ["738043aea7b3417cbc3e9941ad26f769"]
5102 }
5103 });
5104
5105 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5106 .expect("parse request");
5107 let query =
5108 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5109
5110 assert!(query.exclude_facet_field_filters);
5111 }
5112
5113 #[test]
5114 fn parses_netdata_fts_query_like_simple_pattern() {
5115 let (terms, positives, negatives) =
5116 parse_fts_query_patterns(r"error|warning|!debug|escaped\|pipe|\!literal| a*B");
5117
5118 assert_eq!(
5119 positives,
5120 vec![
5121 b"error".to_vec(),
5122 b"warning".to_vec(),
5123 b"escaped|pipe".to_vec(),
5124 b"!literal".to_vec(),
5125 b" a*B".to_vec(),
5126 ]
5127 );
5128 assert_eq!(negatives, vec![b"debug".to_vec()]);
5129 assert_eq!(terms.len(), 6);
5130 assert!(!terms[0].negative);
5131 assert!(terms[2].negative);
5132 assert_eq!(
5133 terms[5],
5134 ExplorerFtsPattern {
5135 parts: vec![b" a".to_vec(), b"B".to_vec()],
5136 negative: false,
5137 }
5138 );
5139
5140 let request = json!({
5141 "query": r"alpha|!debug|needle\|pipe",
5142 "facets": ["PRIORITY"],
5143 });
5144 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5145 .expect("parse request");
5146 let query =
5147 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5148 assert_eq!(
5149 query.fts_patterns,
5150 vec![b"alpha".to_vec(), b"needle|pipe".to_vec()]
5151 );
5152 assert_eq!(query.fts_negative_patterns, vec![b"debug".to_vec()]);
5153 assert_eq!(query.fts_terms.len(), 3);
5154 }
5155
5156 #[test]
5157 fn netdata_requests_never_enable_debug_row_traversal_column_collection() {
5158 let request = json!({
5159 "facets": ["PRIORITY", "_HOSTNAME"],
5160 "histogram": "PRIORITY",
5161 "last": 25
5162 });
5163
5164 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5165 .expect("parse request");
5166 let query =
5167 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5168
5169 assert!(!query.debug_collect_column_fields_by_row_traversal);
5170 }
5171
5172 #[test]
5173 fn netdata_function_reports_requested_facet_groups_even_when_absent() {
5174 let dir = TempDir::new().expect("tempdir");
5175 write_netdata_test_journal(dir.path(), 10);
5176 let request = json!({
5177 "after": 1_700_000_000,
5178 "before": 1_700_000_010,
5179 "facets": ["SERVICE", "MISSING_FIELD"],
5180 "histogram": "SERVICE",
5181 "last": 5,
5182 "slice": true
5183 });
5184 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5185
5186 let response = function
5187 .run_directory_request_json_with_options(
5188 dir.path(),
5189 &request,
5190 NetdataFunctionRunOptions::from_timeout_seconds(0),
5191 )
5192 .expect("run function");
5193
5194 let columns = response["columns"].as_object().expect("columns");
5195 assert!(columns.contains_key("SERVICE"));
5196 assert!(columns.contains_key("MISSING_FIELD"));
5197 let facets = response["facets"].as_array().expect("facets");
5198 assert_eq!(
5199 facets
5200 .iter()
5201 .filter_map(|facet| facet["id"].as_str())
5202 .collect::<Vec<_>>(),
5203 vec!["SERVICE", "MISSING_FIELD"]
5204 );
5205 assert_eq!(
5206 facets[1]["options"].as_array().expect("missing options"),
5207 &Vec::<Value>::new()
5208 );
5209 let accepted = response["accepted_params"]
5210 .as_array()
5211 .expect("accepted params");
5212 assert!(accepted.iter().any(|value| value == "SERVICE"));
5213 assert!(accepted.iter().any(|value| value == "MISSING_FIELD"));
5214 let histograms = response["available_histograms"]
5215 .as_array()
5216 .expect("available histograms");
5217 assert!(histograms.iter().any(|value| value["id"] == "SERVICE"));
5218 assert!(
5219 histograms
5220 .iter()
5221 .any(|value| value["id"] == "MISSING_FIELD")
5222 );
5223 }
5224
5225 #[test]
5226 fn netdata_function_reports_zero_count_existing_facets_for_empty_results() {
5227 let dir = TempDir::new().expect("tempdir");
5228 write_netdata_test_journal(dir.path(), 10);
5229 let request = json!({
5230 "after": 1_700_000_000,
5231 "before": 1_700_000_010,
5232 "facets": ["PRIORITY"],
5233 "histogram": "PRIORITY",
5234 "selections": {
5235 "SERVICE": ["missing"]
5236 },
5237 "last": 5,
5238 "slice": true
5239 });
5240 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5241
5242 let response = function
5243 .run_directory_request_json_with_options(
5244 dir.path(),
5245 &request,
5246 NetdataFunctionRunOptions::from_timeout_seconds(0),
5247 )
5248 .expect("run function");
5249
5250 let facets = response["facets"].as_array().expect("facets");
5251 assert_eq!(facets.len(), 1);
5252 assert_eq!(facets[0]["id"], "PRIORITY");
5253 let options = facets[0]["options"].as_array().expect("options");
5254 assert!(options.iter().any(|option| {
5255 option["id"] == "6" && option["name"] == "info" && option["count"] == 0
5256 }));
5257 assert_eq!(response["items"]["matched"], 0);
5258 }
5259
5260 #[test]
5261 fn netdata_function_api_reports_progress() {
5262 let dir = TempDir::new().expect("tempdir");
5263 write_netdata_test_journal(dir.path(), 9_000);
5264 let request = json!({
5265 "after": 1_700_000_000,
5266 "before": 1_700_000_010,
5267 "facets": ["SERVICE"],
5268 "histogram": "SERVICE",
5269 "last": 0
5270 });
5271 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5272 let mut reports = 0u64;
5273 let mut progress = |progress: NetdataFunctionProgress| {
5274 reports = reports.saturating_add(1);
5275 assert_eq!(progress.current_file, 1);
5276 assert_eq!(progress.total_files, 1);
5277 assert!(progress.stats.rows_examined <= 9_000);
5278 };
5279 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5280 options.progress_interval = Duration::ZERO;
5281 options.progress_callback = Some(&mut progress);
5282
5283 let response = function
5284 .run_directory_request_json_with_options(dir.path(), &request, options)
5285 .expect("run function");
5286
5287 assert_eq!(response["status"], 200);
5288 assert!(reports > 0);
5289 assert_eq!(response["last_modified"], 1_700_000_000_008_999u64);
5290 }
5291
5292 #[test]
5293 fn netdata_function_api_reports_file_end_progress_for_small_scans() {
5294 let dir = TempDir::new().expect("tempdir");
5295 write_netdata_test_journal(dir.path(), 10);
5296 let request = json!({
5297 "after": 1_700_000_000,
5298 "before": 1_700_000_010,
5299 "facets": ["SERVICE"],
5300 "histogram": "SERVICE",
5301 "last": 0
5302 });
5303 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5304 let mut reports = 0u64;
5305 let mut last_rows_examined = 0u64;
5306 let mut progress = |progress: NetdataFunctionProgress| {
5307 reports = reports.saturating_add(1);
5308 last_rows_examined = progress.stats.rows_examined;
5309 };
5310 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5311 options.progress_callback = Some(&mut progress);
5312
5313 let response = function
5314 .run_directory_request_json_with_options(dir.path(), &request, options)
5315 .expect("run function");
5316
5317 assert_eq!(response["status"], 200);
5318 assert_eq!(reports, 1);
5319 assert_eq!(last_rows_examined, 10);
5320 }
5321
5322 #[test]
5323 fn netdata_function_progress_counts_only_query_files() {
5324 let dir = TempDir::new().expect("tempdir");
5325 write_named_netdata_test_journal(
5326 dir.path(),
5327 "old-window.journal",
5328 10,
5329 1_600_000_000_000_000,
5330 );
5331 write_named_netdata_test_journal(
5332 dir.path(),
5333 "current-window.journal",
5334 10,
5335 1_700_000_000_000_000,
5336 );
5337 let request = json!({
5338 "after": 1_700_000_000,
5339 "before": 1_700_000_010,
5340 "facets": ["SERVICE"],
5341 "histogram": "SERVICE",
5342 "last": 0
5343 });
5344 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5345 let mut reports = Vec::new();
5346 let mut progress = |progress: NetdataFunctionProgress| {
5347 reports.push((progress.current_file, progress.total_files));
5348 };
5349 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5350 options.progress_callback = Some(&mut progress);
5351
5352 let response = function
5353 .run_directory_request_json_with_options(dir.path(), &request, options)
5354 .expect("run function");
5355
5356 assert_eq!(response["status"], 200);
5357 assert_eq!(response["_journal_files"]["matched"], 1);
5358 assert_eq!(reports, vec![(1, 1)]);
5359 }
5360
5361 #[test]
5362 fn netdata_function_api_reports_cancellation() {
5363 let dir = TempDir::new().expect("tempdir");
5364 write_netdata_test_journal(dir.path(), 9_000);
5365 let request = json!({
5366 "after": 1_700_000_000,
5367 "before": 1_700_000_010,
5368 "facets": ["SERVICE"],
5369 "histogram": "SERVICE",
5370 "last": 0
5371 });
5372 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5373 let is_cancelled = || true;
5374 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5375 options.cancellation_callback = Some(&is_cancelled);
5376
5377 let response = function
5378 .run_directory_request_json_with_options(dir.path(), &request, options)
5379 .expect("run function");
5380
5381 assert_eq!(response["status"], 499);
5382 assert_eq!(response["errorMessage"], "Request cancelled.");
5383 assert_eq!(
5384 response.as_object().expect("response object").len(),
5385 2,
5386 "plugin-compatible function errors only include status and errorMessage"
5387 );
5388 }
5389
5390 #[test]
5391 fn netdata_function_api_cancels_during_active_scan() {
5392 let dir = TempDir::new().expect("tempdir");
5393 write_netdata_test_journal(dir.path(), 9_000);
5394 let request = json!({
5395 "after": 1_700_000_000,
5396 "before": 1_700_000_010,
5397 "facets": ["SERVICE"],
5398 "histogram": "SERVICE",
5399 "last": 0
5400 });
5401 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5402 let should_cancel = Cell::new(false);
5403 let mut reports = 0u64;
5404 let mut progress = |progress: NetdataFunctionProgress| {
5405 reports = reports.saturating_add(1);
5406 if progress.stats.rows_examined > 0 {
5407 should_cancel.set(true);
5408 }
5409 };
5410 let is_cancelled = || should_cancel.get();
5411 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5412 options.progress_interval = Duration::ZERO;
5413 options.progress_callback = Some(&mut progress);
5414 options.cancellation_callback = Some(&is_cancelled);
5415
5416 let response = function
5417 .run_directory_request_json_with_options(dir.path(), &request, options)
5418 .expect("run function");
5419
5420 assert_eq!(response["status"], 499);
5421 assert_eq!(response["errorMessage"], "Request cancelled.");
5422 assert!(reports > 0);
5423 assert!(should_cancel.get());
5424 }
5425
5426 #[test]
5427 fn netdata_function_api_honors_cancellation_after_final_file_progress() {
5428 let dir = TempDir::new().expect("tempdir");
5429 write_netdata_test_journal(dir.path(), 10);
5430 let request = json!({
5431 "after": 1_700_000_000,
5432 "before": 1_700_000_010,
5433 "facets": ["SERVICE"],
5434 "histogram": "SERVICE",
5435 "last": 0
5436 });
5437 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5438 let should_cancel = Cell::new(false);
5439 let mut progress = |_progress: NetdataFunctionProgress| {
5440 should_cancel.set(true);
5441 };
5442 let is_cancelled = || should_cancel.get();
5443 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5444 options.progress_callback = Some(&mut progress);
5445 options.cancellation_callback = Some(&is_cancelled);
5446
5447 let response = function
5448 .run_directory_request_json_with_options(dir.path(), &request, options)
5449 .expect("run function");
5450
5451 assert_eq!(response["status"], 499);
5452 assert_eq!(response["errorMessage"], "Request cancelled.");
5453 assert!(should_cancel.get());
5454 }
5455
5456 #[test]
5457 fn netdata_function_api_reports_timeout_as_partial_table() {
5458 let dir = TempDir::new().expect("tempdir");
5459 write_netdata_test_journal(dir.path(), 10);
5460 let request = json!({
5461 "after": 1_700_000_000,
5462 "before": 1_700_000_010,
5463 "facets": ["SERVICE"],
5464 "histogram": "SERVICE",
5465 "last": 0
5466 });
5467 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5468 let options = NetdataFunctionRunOptions {
5469 timeout: Some(Duration::ZERO),
5470 ..NetdataFunctionRunOptions::default()
5471 };
5472
5473 let response = function
5474 .run_directory_request_json_with_options(dir.path(), &request, options)
5475 .expect("run function");
5476
5477 assert_eq!(response["status"], 200);
5478 assert_eq!(response["partial"], true);
5479 assert_eq!(response["message"]["status"], "warning");
5480 assert_eq!(
5481 response["message"]["title"],
5482 "Query timed-out, incomplete data. "
5483 );
5484 }
5485
5486 #[test]
5487 fn netdata_function_api_reports_sampling_counters() {
5488 let dir = TempDir::new().expect("tempdir");
5489 write_stepped_netdata_test_journal(
5490 dir.path(),
5491 "netdata-api-test.journal",
5492 5_000,
5493 1_700_000_000_000_000,
5494 1_000,
5495 );
5496 let request = json!({
5497 "after": 1_700_000_000,
5498 "before": 1_700_000_005,
5499 "facets": ["SERVICE"],
5500 "histogram": "SERVICE",
5501 "last": 5,
5502 "sampling": 20
5503 });
5504 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5505
5506 let response = function
5507 .run_directory_request_json_with_options(
5508 dir.path(),
5509 &request,
5510 NetdataFunctionRunOptions::from_timeout_seconds(0),
5511 )
5512 .expect("run function");
5513
5514 assert_eq!(response["status"], 200);
5515 assert!(
5516 response["_sampling"]["sampled"]
5517 .as_u64()
5518 .unwrap_or_default()
5519 > 0
5520 );
5521 assert!(
5522 response["_sampling"]["unsampled"]
5523 .as_u64()
5524 .unwrap_or_default()
5525 > 0
5526 );
5527 assert!(
5528 response["_sampling"]["estimated"]
5529 .as_u64()
5530 .unwrap_or_default()
5531 > 0
5532 );
5533 assert_eq!(
5534 response["items"]["estimated"],
5535 response["_sampling"]["estimated"]
5536 );
5537 assert!(
5538 response["items"]["unsampled"].as_u64().unwrap_or_default()
5539 < response["_sampling"]["unsampled"]
5540 .as_u64()
5541 .unwrap_or_default()
5542 );
5543 assert_eq!(response["message"]["status"], "notice");
5544 }
5545
5546 #[test]
5547 fn netdata_function_api_disables_sampling_for_data_only() {
5548 let dir = TempDir::new().expect("tempdir");
5549 write_netdata_test_journal(dir.path(), 5_000);
5550 let request = json!({
5551 "after": 1_700_000_000,
5552 "before": 1_700_000_010,
5553 "data_only": true,
5554 "last": 5,
5555 "sampling": 20
5556 });
5557 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5558
5559 let response = function
5560 .run_directory_request_json_with_options(
5561 dir.path(),
5562 &request,
5563 NetdataFunctionRunOptions::from_timeout_seconds(0),
5564 )
5565 .expect("run function");
5566
5567 assert_eq!(response["status"], 200);
5568 assert!(response.get("_sampling").is_none());
5569 }
5570
5571 #[test]
5572 fn normalizes_missing_time_window_to_last_hour_like_plugin() {
5573 assert_eq!(
5574 normalize_time_window(1_000_000_000, None, None),
5575 (Some(999_996_400_000_000), Some(1_000_000_000_999_999))
5576 );
5577 }
5578
5579 #[test]
5580 fn normalizes_inverted_time_window_like_plugin() {
5581 assert_eq!(
5582 normalize_time_window(1_000_000_000, Some(200_000_100), Some(200_000_000)),
5583 (Some(200_000_000_000_000), Some(200_000_100_999_999))
5584 );
5585 }
5586
5587 #[test]
5588 fn normalizes_equal_time_window_like_plugin() {
5589 assert_eq!(
5590 normalize_time_window(1_000_000_000, Some(200_000_000), Some(200_000_000)),
5591 (Some(199_996_400_000_000), Some(200_000_000_999_999))
5592 );
5593 }
5594
5595 #[test]
5596 fn normalizes_relative_time_window_like_plugin() {
5597 assert_eq!(
5598 normalize_time_window(1_000_000_000, Some(100), Some(200)),
5599 (Some(999_999_701_000_000), Some(999_999_800_999_999))
5600 );
5601 }
5602
5603 #[test]
5604 fn normalizes_missing_after_with_supplied_before_like_plugin() {
5605 assert_eq!(
5606 normalize_time_window(1_000_000_000, None, Some(200_000_000)),
5607 (Some(199_999_401_000_000), Some(200_000_000_999_999))
5608 );
5609 }
5610
5611 #[test]
5612 fn systemd_profile_transforms_priority_and_facility_for_display() {
5613 let profile = SystemdJournalProfile;
5614 let context = DisplayContext::default();
5615 assert_eq!(
5616 profile.field_display_value(&context, DisplayScope::Data, "PRIORITY", b"7"),
5617 json!("debug")
5618 );
5619 assert_eq!(
5620 profile.field_display_value(&context, DisplayScope::Data, "SYSLOG_FACILITY", b"3"),
5621 json!("daemon")
5622 );
5623 assert_eq!(priority_to_row_severity(b"3"), "critical");
5624 assert_eq!(priority_to_row_severity(b"6"), "normal");
5625 }
5626
5627 #[test]
5628 fn dynamic_process_name_matches_plugin_fallback_order() {
5629 let mut fields = BTreeMap::new();
5630 fields.insert("SYSLOG_IDENTIFIER".to_string(), vec![b"syslog".to_vec()]);
5631 fields.insert("_COMM".to_string(), vec![b"comm".to_vec()]);
5632 fields.insert("_PID".to_string(), vec![b"42".to_vec()]);
5633 fields.insert("SYSLOG_PID".to_string(), vec![b"99".to_vec()]);
5634 assert_eq!(dynamic_process_name(&fields), "syslog[42]");
5635
5636 fields.insert("CONTAINER_NAME".to_string(), vec![b"container".to_vec()]);
5637 assert_eq!(dynamic_process_name(&fields), "container[42]");
5638
5639 fields.remove("CONTAINER_NAME");
5640 fields.remove("SYSLOG_IDENTIFIER");
5641 fields.remove("_PID");
5642 assert_eq!(dynamic_process_name(&fields), "comm[-]");
5643
5644 fields.insert("_PID".to_string(), vec![Vec::new()]);
5645 assert_eq!(dynamic_process_name(&fields), "comm");
5646
5647 fields.remove("_COMM");
5648 fields.remove("_PID");
5649 fields.insert("_EXE".to_string(), vec![b"/usr/bin/app".to_vec()]);
5650 assert_eq!(dynamic_process_name(&fields), "-");
5651 }
5652
5653 #[test]
5654 fn facet_values_are_truncated_and_collapsed_like_plugin() {
5655 let prefix = vec![b'a'; NETDATA_FACET_MAX_VALUE_LENGTH];
5656 let mut first = prefix.clone();
5657 first.extend_from_slice(b"-first");
5658 let mut second = prefix.clone();
5659 second.extend_from_slice(b"-second");
5660
5661 let mut values = BTreeMap::new();
5662 add_netdata_facet_count(&mut values, &first, 2);
5663 add_netdata_facet_count(&mut values, &second, 3);
5664
5665 assert_eq!(values.len(), 1);
5666 assert_eq!(values.get(&prefix), Some(&5));
5667 }
5668
5669 #[test]
5670 fn histogram_values_are_truncated_and_collapsed_like_plugin() {
5671 let prefix = vec![b'b'; NETDATA_FACET_MAX_VALUE_LENGTH];
5672 let mut first = prefix.clone();
5673 first.extend_from_slice(b"-first");
5674 let mut second = prefix.clone();
5675 second.extend_from_slice(b"-second");
5676
5677 let mut values = HashMap::new();
5678 values.insert(first, 2);
5679 values.insert(second, 3);
5680 let histogram = ExplorerHistogram {
5681 field: b"TEST_FIELD".to_vec(),
5682 buckets: vec![ExplorerHistogramBucket {
5683 start_realtime_usec: 1_000_000,
5684 end_realtime_usec: 2_000_000,
5685 values,
5686 }],
5687 };
5688
5689 let function = NetdataJournalFunction::systemd_journal();
5690 let rendered = function.build_histogram(&DisplayContext::default(), &histogram, None);
5691 let labels = rendered["chart"]["result"]["labels"]
5692 .as_array()
5693 .expect("labels");
5694 assert_eq!(labels.len(), 2);
5695 assert_eq!(labels[1], Value::String(String::from_utf8(prefix).unwrap()));
5696 assert_eq!(rendered["chart"]["result"]["data"][0][1][0], json!(5));
5697 }
5698
5699 #[test]
5700 fn histogram_chart_metadata_includes_empty_dimension_arrays() {
5701 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5702 let empty = function.build_histogram(
5703 &DisplayContext::default(),
5704 &ExplorerHistogram {
5705 field: b"TRAP_SEVERITY".to_vec(),
5706 buckets: vec![ExplorerHistogramBucket {
5707 start_realtime_usec: 1_700_000_000_000_000,
5708 end_realtime_usec: 1_700_000_005_000_000,
5709 values: HashMap::new(),
5710 }],
5711 },
5712 None,
5713 );
5714
5715 assert_eq!(empty["chart"]["view"]["dimensions"]["names"], json!([]));
5716 assert_eq!(empty["chart"]["view"]["dimensions"]["ids"], json!([]));
5717 assert_eq!(empty["chart"]["db"]["dimensions"]["names"], json!([]));
5718
5719 let mut values = HashMap::new();
5720 values.insert(b"warning".to_vec(), 7);
5721 let with_value = function.build_histogram(
5722 &DisplayContext::default(),
5723 &ExplorerHistogram {
5724 field: b"TRAP_SEVERITY".to_vec(),
5725 buckets: vec![ExplorerHistogramBucket {
5726 start_realtime_usec: 1_700_000_000_000_000,
5727 end_realtime_usec: 1_700_000_005_000_000,
5728 values,
5729 }],
5730 },
5731 None,
5732 );
5733
5734 assert_eq!(
5735 with_value["chart"]["view"]["dimensions"]["names"],
5736 json!(["warning"])
5737 );
5738 assert_eq!(
5739 with_value["chart"]["view"]["dimensions"]["sts"]["min"],
5740 json!([7])
5741 );
5742 }
5743
5744 #[test]
5745 fn duplicate_row_timestamps_match_plugin_direction_adjustment() {
5746 let mut backward = vec![
5747 test_located_row(100),
5748 test_located_row(100),
5749 test_located_row(100),
5750 test_located_row(90),
5751 ];
5752 make_row_timestamps_unique(&mut backward, Direction::Backward);
5753 assert_eq!(
5754 backward
5755 .iter()
5756 .map(|row| row.row.realtime_usec)
5757 .collect::<Vec<_>>(),
5758 vec![100, 99, 98, 90]
5759 );
5760
5761 let mut forward = vec![
5762 test_located_row(90),
5763 test_located_row(100),
5764 test_located_row(100),
5765 test_located_row(100),
5766 ];
5767 make_row_timestamps_unique(&mut forward, Direction::Forward);
5768 assert_eq!(
5769 forward
5770 .iter()
5771 .map(|row| row.row.realtime_usec)
5772 .collect::<Vec<_>>(),
5773 vec![90, 100, 101, 102]
5774 );
5775 }
5776
5777 #[test]
5778 fn page_window_counts_forward_anchor_like_netdata_facets() {
5779 let config = NetdataFunctionConfig::systemd_journal();
5780 let request = NetdataRequest::parse(
5781 &json!({
5782 "after": 1_700_000_000,
5783 "before": 1_700_000_010,
5784 "anchor": 1_700_000_005_000_000u64,
5785 "direction": "forward",
5786 "last": 2
5787 }),
5788 &config,
5789 )
5790 .expect("parse request");
5791 let mut window = NetdataPageWindow::for_request(&request);
5792
5793 for realtime_usec in [
5794 1_700_000_003_000_000,
5795 1_700_000_004_000_000,
5796 1_700_000_006_000_000,
5797 1_700_000_007_000_000,
5798 1_700_000_008_000_000,
5799 ] {
5800 window.observe(realtime_usec);
5801 }
5802
5803 let counters = window.counters();
5804 assert_eq!(counters.matched, 3);
5805 assert_eq!(counters.before, 1);
5806 assert_eq!(counters.after, 2);
5807 }
5808
5809 #[test]
5810 fn page_window_counts_backward_anchor_like_netdata_facets() {
5811 let config = NetdataFunctionConfig::systemd_journal();
5812 let request = NetdataRequest::parse(
5813 &json!({
5814 "after": 1_700_000_000,
5815 "before": 1_700_000_010,
5816 "anchor": 1_700_000_008_000_000u64,
5817 "direction": "backward",
5818 "last": 2
5819 }),
5820 &config,
5821 )
5822 .expect("parse request");
5823 let mut window = NetdataPageWindow::for_request(&request);
5824
5825 for realtime_usec in [
5826 1_700_000_009_000_000,
5827 1_700_000_007_000_000,
5828 1_700_000_006_000_000,
5829 1_700_000_005_000_000,
5830 ] {
5831 window.observe(realtime_usec);
5832 }
5833
5834 let counters = window.counters();
5835 assert_eq!(counters.matched, 3);
5836 assert_eq!(counters.before, 1);
5837 assert_eq!(counters.after, 1);
5838 }
5839
5840 #[test]
5841 fn page_window_counts_tail_anchor_like_netdata_facets() {
5842 let config = NetdataFunctionConfig::systemd_journal();
5843 let request = NetdataRequest::parse(
5844 &json!({
5845 "after": 1_700_000_000,
5846 "before": 1_700_000_010,
5847 "anchor": 1_700_000_008_000_000u64,
5848 "if_modified_since": 1_700_000_008_000_000u64,
5849 "data_only": true,
5850 "tail": true,
5851 "last": 2
5852 }),
5853 &config,
5854 )
5855 .expect("parse request");
5856 let mut window = NetdataPageWindow::for_request(&request);
5857
5858 for realtime_usec in [
5859 1_700_000_009_000_000,
5860 1_700_000_008_000_000,
5861 1_700_000_007_000_000,
5862 ] {
5863 window.observe(realtime_usec);
5864 }
5865
5866 let counters = window.counters();
5867 assert_eq!(counters.matched, 1);
5868 assert_eq!(counters.before, 0);
5869 assert_eq!(counters.after, 2);
5870 }
5871
5872 #[test]
5873 fn netdata_function_tail_anchor_with_newer_filtered_out_rows_returns_empty_200() {
5874 let dir = TempDir::new().expect("tempdir");
5875 let start_realtime_usec = 1_700_000_000_000_000;
5876 write_stepped_netdata_test_journal(
5877 dir.path(),
5878 "netdata-api-test.journal",
5879 2,
5880 start_realtime_usec,
5881 1_000_000,
5882 );
5883 let request = json!({
5884 "after": 1_700_000_000,
5885 "before": 1_700_000_002,
5886 "anchor": start_realtime_usec,
5887 "if_modified_since": start_realtime_usec,
5888 "data_only": true,
5889 "tail": true,
5890 "last": 5,
5891 "selections": {
5892 "SERVICE": ["even"]
5893 }
5894 });
5895 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5896
5897 let response = function
5898 .run_directory_request_json_with_options(
5899 dir.path(),
5900 &request,
5901 NetdataFunctionRunOptions::from_timeout_seconds(0),
5902 )
5903 .expect("run function");
5904
5905 assert_eq!(response["status"], 200);
5906 assert_eq!(
5907 response_column_strings(&response, "MESSAGE"),
5908 Vec::<String>::new()
5909 );
5910 }
5911
5912 #[test]
5913 fn netdata_function_pages_with_anchor_without_duplicate_or_missing_rows() {
5914 let dir = TempDir::new().expect("tempdir");
5915 let start_realtime_usec = 1_700_000_000_000_000;
5916 write_stepped_netdata_test_journal(
5917 dir.path(),
5918 "paging-contract.journal",
5919 7,
5920 start_realtime_usec,
5921 1,
5922 );
5923
5924 let backward = collect_netdata_pages(dir.path(), Direction::Backward, 2);
5925 assert_eq!(
5926 backward.messages,
5927 vec![
5928 "row-6", "row-5", "row-4", "row-3", "row-2", "row-1", "row-0"
5929 ]
5930 );
5931 assert_unique_messages(&backward.messages);
5932 assert_eq!(
5933 backward.timestamps,
5934 vec![
5935 start_realtime_usec + 6,
5936 start_realtime_usec + 5,
5937 start_realtime_usec + 4,
5938 start_realtime_usec + 3,
5939 start_realtime_usec + 2,
5940 start_realtime_usec + 1,
5941 start_realtime_usec,
5942 ]
5943 );
5944
5945 let forward = collect_netdata_pages(dir.path(), Direction::Forward, 2);
5946 assert_eq!(
5947 forward.messages,
5948 vec![
5949 "row-1", "row-0", "row-3", "row-2", "row-5", "row-4", "row-6"
5950 ]
5951 );
5952 assert_unique_messages(&forward.messages);
5953 assert_eq!(
5954 forward.timestamps,
5955 vec![
5956 start_realtime_usec + 1,
5957 start_realtime_usec,
5958 start_realtime_usec + 3,
5959 start_realtime_usec + 2,
5960 start_realtime_usec + 5,
5961 start_realtime_usec + 4,
5962 start_realtime_usec + 6,
5963 ]
5964 );
5965 }
5966
5967 #[test]
5968 fn netdata_function_tail_polls_return_only_rows_after_anchor_then_304() {
5969 let dir = TempDir::new().expect("tempdir");
5970 let start_realtime_usec = 1_700_000_000_000_000;
5971 write_stepped_netdata_test_journal(
5972 dir.path(),
5973 "tail-contract.journal",
5974 5,
5975 start_realtime_usec,
5976 1,
5977 );
5978 let anchor = start_realtime_usec + 2;
5979
5980 for requested_direction in [Direction::Backward, Direction::Forward] {
5981 let response = run_netdata_contract_request(
5982 dir.path(),
5983 json!({
5984 "after": 1_700_000_000,
5985 "before": 1_700_000_010,
5986 "last": 5,
5987 "direction": direction_name(requested_direction),
5988 "data_only": true,
5989 "tail": true,
5990 "if_modified_since": anchor,
5991 "anchor": anchor,
5992 }),
5993 );
5994 assert_eq!(response["status"], 200);
5995 assert_eq!(response["_request"]["direction"], "backward");
5996 assert_eq!(
5997 response_column_strings(&response, "MESSAGE"),
5998 vec!["row-4", "row-3"]
5999 );
6000 assert_eq!(
6001 response_column_u64s(&response, "timestamp"),
6002 vec![start_realtime_usec + 4, start_realtime_usec + 3]
6003 );
6004 }
6005
6006 let no_change = run_netdata_contract_request(
6007 dir.path(),
6008 json!({
6009 "after": 1_700_000_000,
6010 "before": 1_700_000_010,
6011 "last": 5,
6012 "direction": "backward",
6013 "data_only": true,
6014 "tail": true,
6015 "if_modified_since": start_realtime_usec + 4,
6016 "anchor": start_realtime_usec + 4,
6017 }),
6018 );
6019 assert_eq!(no_change["status"], 304);
6020 assert_eq!(
6021 no_change["errorMessage"],
6022 "No new data since the previous call."
6023 );
6024 }
6025
6026 #[test]
6027 fn netdata_function_tail_delta_reports_exact_incremental_facets_and_histogram() {
6028 let dir = TempDir::new().expect("tempdir");
6029 let start_realtime_usec = 1_700_000_000_000_000;
6030 write_stepped_netdata_test_journal(
6031 dir.path(),
6032 "delta-contract.journal",
6033 5,
6034 start_realtime_usec,
6035 1,
6036 );
6037 let anchor = start_realtime_usec + 1;
6038
6039 let response = run_netdata_contract_request(
6040 dir.path(),
6041 json!({
6042 "after": 1_700_000_000,
6043 "before": 1_700_000_010,
6044 "last": 2,
6045 "direction": "backward",
6046 "data_only": true,
6047 "delta": true,
6048 "tail": true,
6049 "if_modified_since": anchor,
6050 "anchor": anchor,
6051 "facets": ["SERVICE"],
6052 "histogram": "SERVICE",
6053 }),
6054 );
6055
6056 assert_eq!(response["status"], 200);
6057 assert_eq!(
6058 response_column_strings(&response, "MESSAGE"),
6059 vec!["row-4", "row-3"]
6060 );
6061 assert_eq!(
6062 response_facet_count(&response, "facets_delta", "SERVICE", "even"),
6063 2
6064 );
6065 assert_eq!(
6066 response_facet_count(&response, "facets_delta", "SERVICE", "odd"),
6067 1
6068 );
6069 assert_eq!(
6070 response_histogram_total(&response, "histogram_delta", "even"),
6071 2
6072 );
6073 assert_eq!(
6074 response_histogram_total(&response, "histogram_delta", "odd"),
6075 1
6076 );
6077 let items = response["items_delta"].as_object().expect("items_delta");
6078 assert_eq!(items["matched"], 3);
6079 assert_eq!(items["returned"], 2);
6080 assert_eq!(items["after"], 2);
6081 }
6082
6083 #[test]
6084 fn realtime_adjuster_preserves_forward_state_across_file_boundaries() {
6085 let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Forward);
6086
6087 assert_eq!(adjuster.adjust(10), 10);
6088 assert_eq!(adjuster.adjust(10), 11);
6089 assert_eq!(adjuster.adjust(10), 12);
6090 }
6091
6092 #[test]
6093 fn realtime_adjuster_preserves_backward_state_across_file_boundaries() {
6094 let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Backward);
6095
6096 assert_eq!(adjuster.adjust(10), 10);
6097 assert_eq!(adjuster.adjust(10), 9);
6098 assert_eq!(adjuster.adjust(10), 8);
6099 }
6100
6101 #[test]
6102 fn systemd_profile_keeps_user_group_ids_raw_by_default() {
6103 let context = DisplayContext::default();
6104 let profile = SystemdJournalProfile;
6105 assert_eq!(
6106 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6107 json!("0")
6108 );
6109 assert_eq!(
6110 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6111 json!("0")
6112 );
6113 }
6114
6115 #[cfg(unix)]
6116 #[test]
6117 fn plugin_compatible_profile_resolves_user_group_ids_explicitly() {
6118 let context = DisplayContext::default();
6119 let profile = SystemdJournalPluginProfile;
6120 assert_eq!(
6121 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6122 json!("root")
6123 );
6124 assert_eq!(
6125 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6126 json!("root")
6127 );
6128 }
6129
6130 #[cfg(unix)]
6131 #[test]
6132 fn plugin_compatible_profile_caches_user_group_resolution() {
6133 let context = DisplayContext::default();
6134 let profile = SystemdJournalPluginProfile;
6135
6136 assert_eq!(
6137 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6138 json!("root")
6139 );
6140 assert_eq!(
6141 profile.field_display_value(&context, DisplayScope::Data, "_UID", b"0"),
6142 json!("root")
6143 );
6144 assert_eq!(
6145 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6146 json!("root")
6147 );
6148 assert_eq!(
6149 profile.field_display_value(&context, DisplayScope::Data, "_GID", b"0"),
6150 json!("root")
6151 );
6152
6153 assert_eq!(context.uid_display_cache.borrow().len(), 1);
6154 assert_eq!(context.gid_display_cache.borrow().len(), 1);
6155 }
6156
6157 #[test]
6158 fn file_overlap_uses_netdata_max_realtime_slack() {
6159 let file_first_seconds = 200_000_000u64;
6160 let file_last_seconds = 200_000_100u64;
6161 let slack_seconds = NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC / 1_000_000;
6162 let header = crate::FileHeader {
6163 signature: *b"LPKSHHRH",
6164 compatible_flags: 0,
6165 incompatible_flags: 0,
6166 state: 0,
6167 header_size: 0,
6168 n_entries: 0,
6169 head_entry_realtime: file_first_seconds * 1_000_000,
6170 tail_entry_realtime: file_last_seconds * 1_000_000,
6171 head_entry_seqnum: 0,
6172 tail_entry_seqnum: 0,
6173 tail_entry_boot_id: [0; 16],
6174 seqnum_id: [0; 16],
6175 };
6176 let config = NetdataFunctionConfig::systemd_journal();
6177
6178 let inside_slack = NetdataRequest::parse(
6179 &json!({
6180 "after": file_last_seconds + slack_seconds - 1,
6181 "before": file_last_seconds + slack_seconds + 500
6182 }),
6183 &config,
6184 )
6185 .expect("parse request");
6186 assert!(file_may_overlap_request(header, &inside_slack));
6187
6188 let outside_slack = NetdataRequest::parse(
6189 &json!({
6190 "after": file_last_seconds + slack_seconds + 1,
6191 "before": file_last_seconds + slack_seconds + 500
6192 }),
6193 &config,
6194 )
6195 .expect("parse request");
6196 assert!(!file_may_overlap_request(header, &outside_slack));
6197 }
6198
6199 #[test]
6200 fn journal_file_order_matches_plugin_comparator_shape() {
6201 let older = JournalFileOrderInfo {
6202 msg_first_realtime_usec: 100,
6203 msg_last_realtime_usec: 200,
6204 file_last_modified_usec: 200,
6205 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6206 };
6207 let newer = JournalFileOrderInfo {
6208 msg_first_realtime_usec: 100,
6209 msg_last_realtime_usec: 300,
6210 file_last_modified_usec: 100,
6211 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6212 };
6213 assert_eq!(
6214 compare_journal_file_order(&newer, &older, Direction::Backward),
6215 Ordering::Less
6216 );
6217 assert_eq!(
6218 compare_journal_file_order(&newer, &older, Direction::Forward),
6219 Ordering::Greater
6220 );
6221
6222 let newer_mtime = JournalFileOrderInfo {
6223 msg_first_realtime_usec: 100,
6224 msg_last_realtime_usec: 200,
6225 file_last_modified_usec: 300,
6226 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6227 };
6228 assert_eq!(
6229 compare_journal_file_order(&newer_mtime, &older, Direction::Backward),
6230 Ordering::Less
6231 );
6232
6233 let newer_first = JournalFileOrderInfo {
6234 msg_first_realtime_usec: 150,
6235 msg_last_realtime_usec: 200,
6236 file_last_modified_usec: 200,
6237 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6238 };
6239 assert_eq!(
6240 compare_journal_file_order(&newer_first, &older, Direction::Backward),
6241 Ordering::Less
6242 );
6243 }
6244
6245 #[test]
6246 fn boot_first_realtime_keeps_earliest_timestamp_like_plugin() {
6247 let mut boot_first = BTreeMap::new();
6248 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 300);
6249 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 100);
6250 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 200);
6251
6252 assert_eq!(boot_first.get(b"boot-a".as_slice()), Some(&100));
6253 }
6254
6255 #[test]
6256 fn merge_histogram_rejects_inconsistent_bucket_shape() {
6257 let mut target = Some(ExplorerHistogram {
6258 field: b"PRIORITY".to_vec(),
6259 buckets: vec![ExplorerHistogramBucket {
6260 start_realtime_usec: 1_000_000,
6261 end_realtime_usec: 2_000_000,
6262 values: HashMap::new(),
6263 }],
6264 });
6265 let source = ExplorerHistogram {
6266 field: b"PRIORITY".to_vec(),
6267 buckets: vec![ExplorerHistogramBucket {
6268 start_realtime_usec: 1_000_000,
6269 end_realtime_usec: 1_500_000,
6270 values: HashMap::new(),
6271 }],
6272 };
6273
6274 let err = merge_histogram(&mut target, source).expect_err("shape mismatch");
6275 assert!(matches!(
6276 err,
6277 SdkError::Unsupported("inconsistent Netdata histogram bucket shape")
6278 ));
6279 }
6280
6281 #[test]
6282 fn collect_journal_files_recurses_nested_directories() {
6283 let dir = TempDir::new().expect("tempdir");
6284 let nested = dir.path().join("machine").join("nested");
6285 write_named_netdata_test_journal(&nested, "system.journal", 1, 1_700_000_000_000_000);
6286
6287 let collection = collect_journal_files(dir.path()).expect("collect files");
6288
6289 assert_eq!(collection.files.len(), 1);
6290 assert_eq!(collection.skipped, 0);
6291 assert!(collection.errors.is_empty());
6292 assert_eq!(
6293 collection.files[0]
6294 .file_name()
6295 .and_then(|name| name.to_str()),
6296 Some("system.journal")
6297 );
6298 }
6299
6300 #[cfg(unix)]
6301 #[test]
6302 fn collect_journal_files_deduplicates_symlinked_directories() {
6303 let dir = TempDir::new().expect("tempdir");
6304 let real = dir.path().join("real");
6305 let link = dir.path().join("link");
6306 write_named_netdata_test_journal(&real, "system.journal", 1, 1_700_000_000_000_000);
6307 std::os::unix::fs::symlink(&real, &link).expect("symlink");
6308
6309 let collection = collect_journal_files(dir.path()).expect("collect files");
6310
6311 assert_eq!(collection.files.len(), 1);
6312 assert_eq!(collection.skipped, 0);
6313 assert!(collection.errors.is_empty());
6314 }
6315
6316 #[cfg(unix)]
6317 #[test]
6318 fn collect_journal_files_deduplicates_symlinked_files() {
6319 let dir = TempDir::new().expect("tempdir");
6320 write_named_netdata_test_journal(dir.path(), "system.journal", 1, 1_700_000_000_000_000);
6321 std::os::unix::fs::symlink(
6322 dir.path().join("system.journal"),
6323 dir.path().join("system-copy.journal"),
6324 )
6325 .expect("symlink");
6326
6327 let collection = collect_journal_files(dir.path()).expect("collect files");
6328
6329 assert_eq!(collection.files.len(), 1);
6330 assert_eq!(collection.skipped, 0);
6331 assert!(collection.errors.is_empty());
6332 }
6333
6334 #[cfg(unix)]
6335 #[test]
6336 fn collect_journal_files_reports_unreadable_subdirectories() {
6337 use std::os::unix::fs::PermissionsExt;
6338
6339 if unsafe { libc::geteuid() } == 0 {
6340 return;
6341 }
6342
6343 let dir = TempDir::new().expect("tempdir");
6344 std::fs::write(dir.path().join("visible.journal"), b"").expect("journal");
6345 let locked = dir.path().join("locked");
6346 std::fs::create_dir(&locked).expect("locked dir");
6347 std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o000))
6348 .expect("lock dir");
6349
6350 let collection = collect_journal_files(dir.path()).expect("collect files");
6351
6352 std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o700))
6353 .expect("unlock dir");
6354 assert_eq!(collection.files.len(), 1);
6355 assert_eq!(collection.skipped, 1);
6356 assert_eq!(collection.errors.len(), 1);
6357 assert!(collection.errors[0].contains("locked"));
6358 }
6359
6360 #[test]
6361 fn source_summary_fills_missing_caller_metadata_from_header() {
6362 let dir = TempDir::new().expect("tempdir");
6363 write_named_netdata_test_journal(dir.path(), "system.journal", 2, 1_700_000_000_000_000);
6364 let path = dir.path().join("system.journal");
6365 let mut state = TestNetdataState::default();
6366 state.metadata.insert(
6367 path.clone(),
6368 NetdataJournalFileMetadata {
6369 msg_first_realtime_usec: Some(1_699_999_999_000_000),
6370 ..NetdataJournalFileMetadata::default()
6371 },
6372 );
6373 let options = NetdataFunctionRunOptions {
6374 state: Some(&mut state),
6375 ..NetdataFunctionRunOptions::default()
6376 };
6377
6378 let summary = JournalSourceSummary::from_paths(&[path], ReaderOptions::default(), &options);
6379
6380 assert_eq!(summary.first_realtime_usec, Some(1_699_999_999_000_000));
6381 assert_eq!(summary.last_realtime_usec, Some(1_700_000_000_000_001));
6382 }
6383
6384 #[test]
6385 fn source_summary_coverage_is_off_below_one_second() {
6386 let first = 1_700_000_000_000_000;
6387 let mut summary = JournalSourceSummary {
6388 files: 1,
6389 total_size: 0,
6390 first_realtime_usec: Some(first),
6391 last_realtime_usec: Some(first + 999_999),
6392 };
6393
6394 assert!(summary.info().contains("covering off"));
6395
6396 summary.last_realtime_usec = Some(first + 1_000_000);
6397 assert!(summary.info().contains("covering 1s"));
6398 }
6399
6400 #[test]
6401 fn netdata_function_uses_default_source_selector_metadata() {
6402 let dir = TempDir::new().expect("tempdir");
6403 let response = NetdataJournalFunction::systemd_journal_plugin_compatible()
6404 .run_directory_request_json(dir.path(), &json!({"info": true}))
6405 .expect("run info request");
6406 let source = response["required_params"]
6407 .as_array()
6408 .expect("required params")
6409 .iter()
6410 .find(|param| param["id"] == "__logs_sources")
6411 .expect("source selector param");
6412
6413 assert_eq!(source["id"], "__logs_sources");
6414 assert_eq!(source["name"], DEFAULT_SOURCE_SELECTOR_NAME);
6415 assert_eq!(source["help"], DEFAULT_SOURCE_SELECTOR_HELP);
6416 }
6417
6418 #[test]
6419 fn netdata_function_uses_custom_source_selector_metadata() {
6420 let dir = TempDir::new().expect("tempdir");
6421 let mut config = NetdataFunctionConfig::systemd_journal();
6422 config.source_selector_name = "Trap Jobs".to_string();
6423 config.source_selector_help = "Select the trap job to query".to_string();
6424 let function = NetdataJournalFunction::new(config, SystemdJournalPluginProfile);
6425
6426 let response = function
6427 .run_directory_request_json(dir.path(), &json!({"info": true}))
6428 .expect("run info request");
6429 let source = response["required_params"]
6430 .as_array()
6431 .expect("required params")
6432 .iter()
6433 .find(|param| param["id"] == "__logs_sources")
6434 .expect("source selector param");
6435
6436 assert_eq!(source["id"], "__logs_sources");
6437 assert_eq!(source["name"], "Trap Jobs");
6438 assert_eq!(source["help"], "Select the trap job to query");
6439 }
6440
6441 #[test]
6442 fn netdata_function_empty_source_selector_metadata_falls_back_to_defaults() {
6443 let dir = TempDir::new().expect("tempdir");
6444 let mut config = NetdataFunctionConfig::systemd_journal();
6445 config.source_selector_name.clear();
6446 config.source_selector_help.clear();
6447 let function = NetdataJournalFunction::new(config, SystemdJournalPluginProfile);
6448
6449 let response = function
6450 .run_directory_request_json(dir.path(), &json!({"info": true}))
6451 .expect("run info request");
6452 let source = response["required_params"]
6453 .as_array()
6454 .expect("required params")
6455 .iter()
6456 .find(|param| param["id"] == "__logs_sources")
6457 .expect("source selector param");
6458
6459 assert_eq!(source["id"], "__logs_sources");
6460 assert_eq!(source["name"], DEFAULT_SOURCE_SELECTOR_NAME);
6461 assert_eq!(source["help"], DEFAULT_SOURCE_SELECTOR_HELP);
6462 }
6463
6464 #[test]
6465 fn source_selection_echoes_and_filters_known_groups() {
6466 let config = NetdataFunctionConfig::systemd_journal();
6467 let request = NetdataRequest::parse(
6468 &json!({
6469 "selections": {
6470 "__logs_sources": ["all-local-system-logs"]
6471 }
6472 }),
6473 &config,
6474 )
6475 .expect("parse source-filtered request");
6476
6477 assert_eq!(request.source_type, SOURCE_TYPE_LOCAL_SYSTEM);
6478 assert_eq!(
6479 request.echo.get("source_type").and_then(Value::as_u64),
6480 Some(SOURCE_TYPE_LOCAL_SYSTEM)
6481 );
6482 assert!(
6483 request
6484 .echo
6485 .pointer("/selections/__logs_sources/0")
6486 .is_some_and(Value::is_null)
6487 );
6488 assert!(request.matches_source(Path::new("/var/log/journal/machine/system.journal"), None));
6489 assert!(!request.matches_source(
6490 Path::new("/var/log/journal/machine/user-1000.journal"),
6491 None
6492 ));
6493 }
6494
6495 #[test]
6496 fn source_selection_uses_caller_metadata_before_filename_fallback() {
6497 let dir = TempDir::new().expect("tempdir");
6498 write_named_netdata_test_journal(dir.path(), "user-1000.journal", 1, 1_700_000_000_000_000);
6499 let path = dir.path().join("user-1000.journal");
6500 let config = NetdataFunctionConfig::systemd_journal();
6501 let request = NetdataRequest::parse(
6502 &json!({
6503 "after": 1_700_000_000,
6504 "before": 1_700_000_001,
6505 "selections": {
6506 "__logs_sources": ["all-local-system-logs"]
6507 }
6508 }),
6509 &config,
6510 )
6511 .expect("parse source-filtered request");
6512 assert!(!request.matches_source(&path, None));
6513
6514 let mut state = TestNetdataState::default();
6515 state.metadata.insert(
6516 path.clone(),
6517 NetdataJournalFileMetadata {
6518 source_type: Some(NETDATA_SOURCE_TYPE_LOCAL_SYSTEM),
6519 source_name: Some("system-registry".to_string()),
6520 ..NetdataJournalFileMetadata::default()
6521 },
6522 );
6523 let options = NetdataFunctionRunOptions {
6524 state: Some(&mut state),
6525 ..NetdataFunctionRunOptions::default()
6526 };
6527 let selected =
6528 select_journal_files_for_request(vec![path], &request, config.reader_options, &options);
6529
6530 assert_eq!(selected.files.len(), 1);
6531 }
6532
6533 #[test]
6534 fn netdata_function_state_receives_learned_source_realtime_delta() {
6535 let dir = TempDir::new().expect("tempdir");
6536 let commit_realtime_usec = 1_700_000_030_000_000;
6537 let source_realtime_usec = commit_realtime_usec - 30_000_000;
6538 write_source_realtime_delta_journal(
6539 dir.path(),
6540 "system.journal",
6541 commit_realtime_usec,
6542 source_realtime_usec,
6543 );
6544 let request = json!({
6545 "after": 1_700_000_029,
6546 "before": 1_700_000_031,
6547 "facets": ["SERVICE"],
6548 "histogram": "SERVICE",
6549 "last": 1,
6550 "sampling": 0
6551 });
6552 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
6553 let mut state = TestNetdataState::default();
6554 let options = NetdataFunctionRunOptions {
6555 state: Some(&mut state),
6556 ..NetdataFunctionRunOptions::from_timeout_seconds(0)
6557 };
6558
6559 let response = function
6560 .run_directory_request_json_with_options(dir.path(), &request, options)
6561 .expect("run function");
6562
6563 assert_eq!(response["status"], 200);
6564 assert_eq!(state.updates.len(), 1);
6565 assert_eq!(state.updates[0].1, 30_000_000);
6566 }
6567
6568 #[test]
6569 fn source_classification_matches_plugin_filename_shape() {
6570 assert_eq!(
6571 journal_file_source_type(Path::new("/var/log/journal/machine/system.journal")),
6572 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM
6573 );
6574 assert_eq!(
6575 journal_file_source_type(Path::new("/var/log/journal/machine/user-1000.journal")),
6576 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER
6577 );
6578 assert_eq!(
6579 journal_file_source_type(Path::new("/var/log/journal/machine/other.journal")),
6580 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
6581 );
6582 assert_eq!(
6583 journal_file_source_type(Path::new(
6584 "/var/log/journal/machine.namespace/system.journal"
6585 )),
6586 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE
6587 );
6588 assert_eq!(
6589 journal_file_source_type(Path::new(
6590 "/var/log/journal/remote/remote-host-a@machine.journal"
6591 )),
6592 SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL
6593 );
6594 }
6595
6596 #[test]
6597 fn exact_source_names_follow_plugin_prefixes() {
6598 assert_eq!(
6599 journal_file_exact_source_name(Path::new(
6600 "/var/log/journal/machine.namespace/system.journal"
6601 ))
6602 .as_deref(),
6603 Some("namespace-namespace")
6604 );
6605 assert_eq!(
6606 journal_file_exact_source_name(Path::new(
6607 "/var/log/journal/remote/remote-host-a@machine.journal"
6608 ))
6609 .as_deref(),
6610 Some("remote-host-a")
6611 );
6612 assert_eq!(
6613 journal_file_exact_source_name(Path::new(
6614 "/var/log/journal/remote/remote-host-b.journal~.zst"
6615 ))
6616 .as_deref(),
6617 Some("remote-host-b")
6618 );
6619 }
6620
6621 #[test]
6622 fn disposed_journal_extension_matches_plugin_scan_contract() {
6623 assert!(is_journal_file_name(Path::new("active.journal")));
6624 assert!(is_journal_file_name(Path::new("archived.journal~")));
6625 assert!(is_journal_file_name(Path::new("active.journal.zst")));
6626 assert!(is_journal_file_name(Path::new("archived.journal~.zst")));
6627 }
6628
6629 fn test_located_row(realtime_usec: u64) -> LocatedRow {
6630 LocatedRow {
6631 file_path: PathBuf::from("test.journal"),
6632 row: ExplorerRow {
6633 realtime_usec,
6634 cursor: String::new(),
6635 payloads: Vec::new(),
6636 },
6637 }
6638 }
6639
6640 fn write_source_realtime_delta_journal(
6641 directory: &std::path::Path,
6642 name: &str,
6643 commit_realtime_usec: u64,
6644 source_realtime_usec: u64,
6645 ) {
6646 std::fs::create_dir_all(directory).expect("create journal dir");
6647 let path = directory.join(name);
6648 let repo_file = RepoFile::from_path(&path).expect("repo file");
6649 let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
6650 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
6651 let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
6652 let source = format!("_SOURCE_REALTIME_TIMESTAMP={source_realtime_usec}");
6653 let payloads: [&[u8]; 4] = [
6654 b"MESSAGE=source lag test".as_slice(),
6655 b"SERVICE=delta".as_slice(),
6656 b"PRIORITY=6".as_slice(),
6657 source.as_bytes(),
6658 ];
6659 writer
6660 .add_entry(
6661 &mut file,
6662 &payloads,
6663 commit_realtime_usec,
6664 commit_realtime_usec,
6665 )
6666 .expect("write entry");
6667 file.sync().expect("sync journal");
6668 }
6669}