1use crate::explorer::{
2 ExplorerSamplingState, empty_histogram_for_query, histogram_bucket_count_for_query,
3};
4use crate::{
5 Direction, ExplorerAnchor, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
6 ExplorerFtsPattern, ExplorerHistogram, ExplorerProgress, ExplorerQuery, ExplorerResult,
7 ExplorerRow, ExplorerSampling, ExplorerStats, ExplorerStopReason, ExplorerStrategy, FileHeader,
8 FileReader, ReaderOptions, Result, SdkError,
9};
10use chrono::{DateTime, Utc};
11use serde_json::{Map, Value, json};
12use std::cell::RefCell;
13use std::cmp::{Ordering, Reverse};
14use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet, VecDeque};
15#[cfg(unix)]
16use std::ffi::CStr;
17use std::path::{Path, PathBuf};
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19
20const DEFAULT_FUNCTION_NAME: &str = "systemd-journal";
21const DEFAULT_ITEMS_TO_RETURN: usize = 200;
22const DEFAULT_TIME_WINDOW_SECONDS: i64 = 3600;
23const DEFAULT_ITEMS_SAMPLING: u64 = 1_000_000;
24const DATA_ONLY_CHECK_EVERY_ROWS: u64 = 128;
25const API_RELATIVE_TIME_MAX_SECONDS: i64 = 3 * 365 * 86_400;
26const NETDATA_MISSING_AFTER_RELATIVE_SECONDS: i64 = 600;
27const DEFAULT_HISTOGRAM_BUCKETS: usize = 150;
28const EFFECTIVELY_DISABLED_TIMEOUT_SECONDS: u64 = 100 * 365 * 24 * 60 * 60;
29const NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC: u64 = 5_000_000;
30const NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC: u64 = 2 * 60 * 1_000_000;
31const NETDATA_EMPTY_STRING_FACET_HASH_ID: &str = "CzGfAU2z3TC";
32const NETDATA_UNAVAILABLE_FIELD_LABEL: &str = "[unavailable field]";
33const NETDATA_FACET_MAX_VALUE_LENGTH: usize = 8192;
34const NETDATA_MAX_DIRECTORY_SCAN_DEPTH: usize = 64;
35const NETDATA_MAX_DIRECTORY_SCAN_COUNT: usize = 8192;
36const SOURCE_TYPE_ALL: u64 = 1 << 0;
37const SOURCE_TYPE_LOCAL_ALL: u64 = 1 << 1;
38const SOURCE_TYPE_REMOTE_ALL: u64 = 1 << 2;
39const SOURCE_TYPE_LOCAL_SYSTEM: u64 = 1 << 3;
40const SOURCE_TYPE_LOCAL_USER: u64 = 1 << 4;
41const SOURCE_TYPE_LOCAL_NAMESPACE: u64 = 1 << 5;
42const SOURCE_TYPE_LOCAL_OTHER: u64 = 1 << 6;
43
44pub const NETDATA_SOURCE_TYPE_ALL: u64 = SOURCE_TYPE_ALL;
45pub const NETDATA_SOURCE_TYPE_LOCAL_ALL: u64 = SOURCE_TYPE_LOCAL_ALL;
46pub const NETDATA_SOURCE_TYPE_REMOTE_ALL: u64 = SOURCE_TYPE_REMOTE_ALL;
47pub const NETDATA_SOURCE_TYPE_LOCAL_SYSTEM: u64 = SOURCE_TYPE_LOCAL_SYSTEM;
48pub const NETDATA_SOURCE_TYPE_LOCAL_USER: u64 = SOURCE_TYPE_LOCAL_USER;
49pub const NETDATA_SOURCE_TYPE_LOCAL_NAMESPACE: u64 = SOURCE_TYPE_LOCAL_NAMESPACE;
50pub const NETDATA_SOURCE_TYPE_LOCAL_OTHER: u64 = SOURCE_TYPE_LOCAL_OTHER;
51
52const NETDATA_ACCEPTED_PARAMS: &[&str] = &[
53 "info",
54 "__logs_sources",
55 "after",
56 "before",
57 "anchor",
58 "direction",
59 "last",
60 "query",
61 "facets",
62 "histogram",
63 "if_modified_since",
64 "data_only",
65 "delta",
66 "tail",
67 "sampling",
68 "slice",
69];
70
71const SYSTEMD_DEFAULT_VIEW_KEYS: &[&str] = &[
72 "_HOSTNAME",
73 "ND_JOURNAL_PROCESS",
74 "MESSAGE",
75 "PRIORITY",
76 "SYSLOG_FACILITY",
77 "ERRNO",
78 "ND_JOURNAL_FILE",
79 "SYSLOG_IDENTIFIER",
80 "UNIT",
81 "USER_UNIT",
82 "MESSAGE_ID",
83 "_BOOT_ID",
84 "_SYSTEMD_OWNER_UID",
85 "_UID",
86 "OBJECT_SYSTEMD_OWNER_UID",
87 "OBJECT_UID",
88 "_GID",
89 "OBJECT_GID",
90 "_CAP_EFFECTIVE",
91 "_AUDIT_LOGINUID",
92 "OBJECT_AUDIT_LOGINUID",
93 "_SOURCE_REALTIME_TIMESTAMP",
94];
95
96const SYSTEMD_DEFAULT_FACETS: &[&str] = &[
97 "_HOSTNAME",
98 "PRIORITY",
99 "SYSLOG_FACILITY",
100 "ERRNO",
101 "SYSLOG_IDENTIFIER",
102 "UNIT",
103 "USER_UNIT",
104 "MESSAGE_ID",
105 "_BOOT_ID",
106 "_SYSTEMD_OWNER_UID",
107 "_UID",
108 "OBJECT_SYSTEMD_OWNER_UID",
109 "OBJECT_UID",
110 "_GID",
111 "OBJECT_GID",
112 "_AUDIT_LOGINUID",
113 "OBJECT_AUDIT_LOGINUID",
114 "CODE_FILE",
115 "_SYSTEMD_UNIT",
116 "_SYSTEMD_USER_SLICE",
117 "CODE_FUNC",
118 "_TRANSPORT",
119 "_COMM",
120 "_RUNTIME_SCOPE",
121 "_MACHINE_ID",
122 "_SYSTEMD_SLICE",
123 "UNIT_RESULT",
124 "_SYSTEMD_CGROUP",
125 "_EXE",
126 "_SYSTEMD_USER_UNIT",
127 "_SYSTEMD_SESSION",
128 "COREDUMP_CGROUP",
129 "COREDUMP_USER_UNIT",
130 "COREDUMP_UNIT",
131 "COREDUMP_SIGNAL_NAME",
132 "COREDUMP_COMM",
133 "_UDEV_DEVNODE",
134 "_KERNEL_SUBSYSTEM",
135 "OBJECT_EXE",
136 "OBJECT_SYSTEMD_CGROUP",
137 "OBJECT_COMM",
138 "OBJECT_SYSTEMD_UNIT",
139 "OBJECT_SYSTEMD_USER_UNIT",
140 "_SELINUX_CONTEXT",
141 "_NAMESPACE",
142 "OBJECT_SYSTEMD_SESSION",
143 "CONTAINER_ID",
144 "CONTAINER_NAME",
145 "CONTAINER_TAG",
146 "IMAGE_NAME",
147 "ND_NIDL_NODE",
148 "ND_NIDL_CONTEXT",
149 "ND_LOG_SOURCE",
150 "ND_ALERT_NAME",
151 "ND_ALERT_CLASS",
152 "ND_ALERT_COMPONENT",
153 "ND_ALERT_TYPE",
154 "ND_ALERT_STATUS",
155];
156
157#[derive(Debug, Clone)]
158pub struct NetdataFunctionConfig {
159 pub function_name: String,
160 pub default_facets: Vec<String>,
161 pub default_view_keys: Vec<String>,
162 pub default_histogram: Option<String>,
163 pub reader_options: ReaderOptions,
164 pub explorer_strategy: ExplorerStrategy,
165}
166
167impl NetdataFunctionConfig {
168 pub fn systemd_journal() -> Self {
169 Self {
170 function_name: DEFAULT_FUNCTION_NAME.to_string(),
171 default_facets: SYSTEMD_DEFAULT_FACETS
172 .iter()
173 .map(|field| (*field).to_string())
174 .collect(),
175 default_view_keys: SYSTEMD_DEFAULT_VIEW_KEYS
176 .iter()
177 .map(|field| (*field).to_string())
178 .collect(),
179 default_histogram: Some("PRIORITY".to_string()),
180 reader_options: ReaderOptions::snapshot(),
181 explorer_strategy: ExplorerStrategy::Traversal,
182 }
183 }
184}
185
186impl Default for NetdataFunctionConfig {
187 fn default() -> Self {
188 Self::systemd_journal()
189 }
190}
191
192#[derive(Debug, Default)]
193pub struct DisplayContext {
194 boot_first_realtime: BTreeMap<Vec<u8>, u64>,
195 uid_display_cache: RefCell<BTreeMap<String, String>>,
196 gid_display_cache: RefCell<BTreeMap<String, String>>,
197}
198
199#[derive(Debug, Clone, Copy)]
200pub enum DisplayScope {
201 Data,
202 Facet,
203 Histogram,
204}
205
206pub trait NetdataFunctionProfile {
207 fn field_display_value(
208 &self,
209 _context: &DisplayContext,
210 _scope: DisplayScope,
211 _field: &str,
212 value: &[u8],
213 ) -> Value {
214 Value::String(String::from_utf8_lossy(value).into_owned())
215 }
216
217 fn facet_option_name(&self, context: &DisplayContext, field: &str, raw_value: &[u8]) -> String {
218 match self.field_display_value(context, DisplayScope::Facet, field, raw_value) {
219 Value::String(value) => value,
220 other => other.to_string(),
221 }
222 }
223
224 fn row_options(&self, fields: &BTreeMap<String, Vec<Vec<u8>>>) -> Value {
225 if let Some(priority) = first_value(fields, "PRIORITY") {
226 return json!({ "severity": priority_to_row_severity(priority) });
227 }
228 json!({ "severity": "normal" })
229 }
230}
231
232#[derive(Debug, Clone, Copy, Default)]
233pub struct SystemdJournalProfile;
234
235#[derive(Debug, Clone, Copy, Default)]
236pub struct SystemdJournalPluginProfile;
237
238impl NetdataFunctionProfile for SystemdJournalProfile {
239 fn field_display_value(
240 &self,
241 context: &DisplayContext,
242 scope: DisplayScope,
243 field: &str,
244 value: &[u8],
245 ) -> Value {
246 systemd_field_display_value(context, scope, field, value, false)
247 }
248}
249
250impl NetdataFunctionProfile for SystemdJournalPluginProfile {
251 fn field_display_value(
252 &self,
253 context: &DisplayContext,
254 scope: DisplayScope,
255 field: &str,
256 value: &[u8],
257 ) -> Value {
258 systemd_field_display_value(context, scope, field, value, true)
259 }
260}
261
262#[derive(Debug, Clone)]
263pub struct NetdataJournalFunction<P = SystemdJournalProfile> {
264 config: NetdataFunctionConfig,
265 profile: P,
266}
267
268#[derive(Debug, Clone)]
269pub struct NetdataFunctionProgress {
270 pub current_file: usize,
271 pub total_files: usize,
272 pub matched_files: u64,
273 pub skipped_files: u64,
274 pub stats: ExplorerStats,
275 pub elapsed: Duration,
276}
277
278#[derive(Debug, Clone, Default, PartialEq, Eq)]
279pub struct NetdataJournalFileMetadata {
280 pub source_type: Option<u64>,
281 pub source_name: Option<String>,
282 pub file_last_modified_usec: Option<u64>,
283 pub msg_first_realtime_usec: Option<u64>,
284 pub msg_last_realtime_usec: Option<u64>,
285 pub journal_vs_realtime_delta_usec: Option<u64>,
286}
287
288pub trait NetdataFunctionState {
289 fn file_metadata(&self, _path: &Path) -> Option<NetdataJournalFileMetadata> {
290 None
291 }
292
293 fn update_file_journal_vs_realtime_delta_usec(&mut self, _path: &Path, _delta_usec: u64) {}
294}
295
296pub struct NetdataFunctionRunOptions<'a> {
297 pub timeout: Option<Duration>,
298 pub progress_callback: Option<&'a mut dyn FnMut(NetdataFunctionProgress)>,
299 pub cancellation_callback: Option<&'a dyn Fn() -> bool>,
300 pub state: Option<&'a mut dyn NetdataFunctionState>,
301 pub progress_interval: Duration,
302}
303
304impl NetdataFunctionRunOptions<'_> {
305 pub fn from_timeout_seconds(seconds: u64) -> Self {
306 let seconds = if seconds == 0 {
307 EFFECTIVELY_DISABLED_TIMEOUT_SECONDS
308 } else {
309 seconds
310 };
311 Self {
312 timeout: Some(Duration::from_secs(seconds)),
313 progress_callback: None,
314 cancellation_callback: None,
315 state: None,
316 progress_interval: Duration::from_millis(250),
317 }
318 }
319}
320
321impl Default for NetdataFunctionRunOptions<'_> {
322 fn default() -> Self {
323 Self {
324 timeout: Some(Duration::from_secs(EFFECTIVELY_DISABLED_TIMEOUT_SECONDS)),
325 progress_callback: None,
326 cancellation_callback: None,
327 state: None,
328 progress_interval: Duration::from_millis(250),
329 }
330 }
331}
332
333impl NetdataJournalFunction<SystemdJournalProfile> {
334 pub fn systemd_journal() -> Self {
335 Self {
336 config: NetdataFunctionConfig::systemd_journal(),
337 profile: SystemdJournalProfile,
338 }
339 }
340}
341
342impl NetdataJournalFunction<SystemdJournalPluginProfile> {
343 pub fn systemd_journal_plugin_compatible() -> Self {
344 Self {
345 config: NetdataFunctionConfig::systemd_journal(),
346 profile: SystemdJournalPluginProfile,
347 }
348 }
349}
350
351impl<P> NetdataJournalFunction<P>
352where
353 P: NetdataFunctionProfile,
354{
355 pub fn new(config: NetdataFunctionConfig, profile: P) -> Self {
356 Self { config, profile }
357 }
358
359 pub fn run_directory_request_json(&self, directory: &Path, request: &Value) -> Result<Value> {
360 self.run_directory_request_json_with_options(
361 directory,
362 request,
363 NetdataFunctionRunOptions::default(),
364 )
365 }
366
367 pub fn run_directory_request_json_with_options(
368 &self,
369 directory: &Path,
370 request: &Value,
371 mut options: NetdataFunctionRunOptions<'_>,
372 ) -> Result<Value> {
373 let request = NetdataRequest::parse(request, &self.config)?;
374 let collection = collect_journal_files(directory)?;
375 let paths = collection.files;
376 if request.info {
377 return Ok(self.info_response(request.echo, &paths, &options));
378 }
379 let annotation_paths = paths.clone();
380
381 let selected =
382 select_journal_files_for_request(paths, &request, self.config.reader_options, &options);
383 if let Some(response) = not_modified_before_scan_response(&request, &selected) {
384 return Ok(response);
385 }
386 let selected_files = selected.files;
387 let deadline = options.timeout.map(|timeout| Instant::now() + timeout);
388 let mut combined = self.explore_files(&selected_files, &request, deadline, &mut options)?;
389 self.finalize_combined_result(
390 &request,
391 &selected_files,
392 deadline,
393 &mut options,
394 &mut combined,
395 collection.skipped,
396 collection.errors,
397 )?;
398 Ok(self.query_response(request, &annotation_paths, combined))
399 }
400
401 fn finalize_combined_result(
402 &self,
403 request: &NetdataRequest,
404 selected_files: &[SelectedJournalFile],
405 deadline: Option<Instant>,
406 options: &mut NetdataFunctionRunOptions<'_>,
407 combined: &mut CombinedResult,
408 skipped_files: u64,
409 file_errors: Vec<String>,
410 ) -> Result<()> {
411 combined.skipped_files = combined.skipped_files.saturating_add(skipped_files);
412 combined.file_errors.extend(file_errors);
413 if combined.cancelled {
414 return Ok(());
415 }
416 if !request.data_only {
417 combined.add_zero_count_facet_values_from_files(
418 &request.facets,
419 self.config.reader_options,
420 );
421 combined.add_zero_count_selected_filter_values(request);
422 }
423 if should_collect_unfiltered_facet_vocabulary(request, combined) {
424 let vocabulary = self.explore_files(
425 selected_files,
426 &request.unfiltered_vocabulary(),
427 deadline,
428 options,
429 )?;
430 combined.add_zero_count_facet_values(&vocabulary.facets);
431 }
432 Ok(())
433 }
434
435 pub fn run_directory_request_bytes(&self, directory: &Path, request: &[u8]) -> Result<Value> {
436 self.run_directory_request_bytes_with_options(
437 directory,
438 request,
439 NetdataFunctionRunOptions::default(),
440 )
441 }
442
443 pub fn run_directory_request_bytes_with_options(
444 &self,
445 directory: &Path,
446 request: &[u8],
447 options: NetdataFunctionRunOptions<'_>,
448 ) -> Result<Value> {
449 let request: Value = serde_json::from_slice(request).map_err(|err| {
450 SdkError::InvalidPath(format!("invalid Netdata function JSON: {err}"))
451 })?;
452 self.run_directory_request_json_with_options(directory, &request, options)
453 }
454
455 fn explore_files(
456 &self,
457 files: &[SelectedJournalFile],
458 request: &NetdataRequest,
459 deadline: Option<Instant>,
460 options: &mut NetdataFunctionRunOptions<'_>,
461 ) -> Result<CombinedResult> {
462 let query = request.to_explorer_query(
463 files.len() as u64,
464 None,
465 NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
466 );
467 let mut combined = CombinedResult::default();
468 let page_window = RefCell::new(NetdataPageWindow::for_request(request));
469 combined.sampling_enabled = query.sampling.is_some();
470 let mut sampling_state =
471 ExplorerSamplingState::for_query(&query, histogram_bucket_count_for_query(&query));
472 let realtime_adjuster = RefCell::new(NetdataRealtimeAdjuster::new(request.direction));
473 let started = Instant::now();
474 let total_files = files.len();
475 for (file_index, file) in files.iter().enumerate() {
476 let path = &file.path;
477 if should_stop_before_file(&mut combined, deadline, options) {
478 break;
479 }
480 let Some(mut reader) = self.open_file_for_explore(
481 path,
482 &mut combined,
483 options,
484 progress_context(file_index, total_files, started),
485 )?
486 else {
487 continue;
488 };
489 combined.matched_files = combined.matched_files.saturating_add(1);
490 combined.matched_paths.push(path.clone());
491 let query = request.file_query(files.len(), reader.header(), &file.order);
492 collect_column_fields_for_file(&mut reader, request, path, &mut combined);
493 let explored = self.explore_single_file(
494 &mut reader,
495 request,
496 &query,
497 deadline,
498 options,
499 &combined,
500 &page_window,
501 sampling_state.as_mut(),
502 &realtime_adjuster,
503 progress_context(file_index, total_files, started),
504 file.order.journal_vs_realtime_delta_usec,
505 );
506 let Some((result, stop_reason)) = record_explore_result(explored, path, &mut combined)
507 else {
508 continue;
509 };
510 if finish_explored_file(
511 options,
512 request,
513 file,
514 &query,
515 result,
516 stop_reason,
517 &mut combined,
518 files,
519 file_index,
520 progress_context(file_index, total_files, started),
521 )? {
522 break;
523 }
524 }
525 combined.expand_row_payloads(self.config.reader_options);
526 combined.page_counters = Some(page_window.into_inner().counters());
527 Ok(combined)
528 }
529
530 fn open_file_for_explore(
531 &self,
532 path: &Path,
533 combined: &mut CombinedResult,
534 options: &mut NetdataFunctionRunOptions<'_>,
535 progress: ProgressContext,
536 ) -> Result<Option<FileReader>> {
537 match FileReader::open_with_options(path, self.config.reader_options) {
538 Ok(reader) => Ok(Some(reader)),
539 Err(err) => {
540 combined.skipped_files = combined.skipped_files.saturating_add(1);
541 combined
542 .file_errors
543 .push(format!("{}: {err}", path.display()));
544 emit_progress_for_combined(options, combined, progress);
545 Ok(None)
546 }
547 }
548 }
549
550 #[allow(clippy::too_many_arguments)]
551 fn explore_single_file(
552 &self,
553 reader: &mut FileReader,
554 request: &NetdataRequest,
555 query: &ExplorerQuery,
556 deadline: Option<Instant>,
557 options: &mut NetdataFunctionRunOptions<'_>,
558 combined: &CombinedResult,
559 page_window: &RefCell<NetdataPageWindow>,
560 sampling_state: Option<&mut ExplorerSamplingState>,
561 realtime_adjuster: &RefCell<NetdataRealtimeAdjuster>,
562 progress: ProgressContext,
563 realtime_delta_usec: u64,
564 ) -> Result<(ExplorerResult, Option<ExplorerStopReason>)> {
565 let cancellation_callback = options.cancellation_callback;
566 let progress_interval = options.progress_interval;
567 let mut explorer_progress = |explorer_progress: ExplorerProgress| {
568 emit_explorer_progress(options, combined, explorer_progress, progress);
569 };
570 let mut control = ExplorerControl::new();
571 control.set_deadline(deadline);
572 control.set_cancellation_callback(cancellation_callback);
573 control.set_progress_interval(progress_interval);
574 control.set_progress_callback(Some(&mut explorer_progress));
575 control.set_sampling_state(sampling_state);
576 let mut candidate_row =
577 |realtime_usec| page_window.borrow().candidate_to_keep(realtime_usec);
578 control.set_candidate_row_callback(Some(&mut candidate_row));
579 let mut adjust_realtime =
580 |realtime_usec| realtime_adjuster.borrow_mut().adjust(realtime_usec);
581 control.set_realtime_adjust_callback(Some(&mut adjust_realtime));
582 let mut matched_row = |realtime_usec, rows_matched| {
583 delta_scan_can_stop(
584 request,
585 page_window,
586 realtime_usec,
587 rows_matched,
588 realtime_delta_usec,
589 )
590 };
591 control.set_matched_row_callback(Some(&mut matched_row));
592 let result = reader.explore_with_strategy_cursor_rows_controlled(
593 query,
594 self.config.explorer_strategy,
595 &mut control,
596 )?;
597 Ok((result, control.stop_reason()))
598 }
599
600 fn info_response(
601 &self,
602 echo: Value,
603 paths: &[PathBuf],
604 options: &NetdataFunctionRunOptions<'_>,
605 ) -> Value {
606 json!({
607 "_request": echo,
608 "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
609 "v": 3,
610 "accepted_params": self.accepted_params_from_fields(&[]),
611 "required_params": self.required_source_params(paths, options),
612 "show_ids": true,
613 "has_history": true,
614 "pagination": {
615 "enabled": true,
616 "key": "anchor",
617 "column": "timestamp",
618 "units": "timestamp_usec",
619 },
620 "status": 200,
621 "type": "table",
622 "help": "Netdata-compatible journal log function backed by the systemd journal SDK"
623 })
624 }
625
626 fn query_response(
627 &self,
628 request: NetdataRequest,
629 annotation_paths: &[PathBuf],
630 combined: CombinedResult,
631 ) -> Value {
632 if combined.cancelled {
633 return netdata_function_error(499, "Request cancelled.");
634 }
635 let artifacts = self.query_response_artifacts(&request, annotation_paths, &combined);
636 let mut response = base_query_response(&request, &combined, &artifacts);
637 let Some(object) = response.as_object_mut() else {
638 return netdata_function_error(500, "Internal Netdata function response error.");
639 };
640 self.add_query_response_metadata(object, &request, &combined, artifacts);
641 response
642 }
643
644 fn query_response_artifacts(
645 &self,
646 request: &NetdataRequest,
647 annotation_paths: &[PathBuf],
648 combined: &CombinedResult,
649 ) -> QueryResponseArtifacts {
650 let reportable_facet_fields = combined.reportable_facet_fields_bytes(&request.facets);
651 let reportable_facet_field_names = string_fields(&reportable_facet_fields);
652 let columns = self.build_columns(
653 &request,
654 &reportable_facet_fields,
655 &combined.rows,
656 &combined.facets,
657 &combined.column_fields,
658 );
659 let boot_ids = response_boot_ids(
660 &columns.order,
661 &combined.rows,
662 &combined.facets,
663 combined.histogram.as_ref(),
664 );
665 let context = DisplayContext {
666 boot_first_realtime: collect_boot_first_realtime(
667 annotation_paths,
668 self.config.reader_options,
669 &boot_ids,
670 ),
671 ..DisplayContext::default()
672 };
673 let data =
674 self.build_data_rows(&context, &columns.order, &combined.rows, request.direction);
675 let facets = self.build_facets(&context, &reportable_facet_fields, &combined.facets);
676 let histogram = self.query_histogram_artifact(request, combined, &context);
677 let message = query_message(combined.timed_out, &combined.stats);
678 let items = response_items(request, combined, data.len() as u64);
679 QueryResponseArtifacts {
680 reportable_facet_field_names,
681 columns,
682 data,
683 facets,
684 histogram,
685 message,
686 items,
687 }
688 }
689
690 fn add_query_response_metadata(
691 &self,
692 object: &mut Map<String, Value>,
693 request: &NetdataRequest,
694 combined: &CombinedResult,
695 artifacts: QueryResponseArtifacts,
696 ) {
697 if !request.data_only {
698 self.add_full_query_response_metadata(object, request, combined, &artifacts);
699 } else if request.histogram.is_some() {
700 object.insert(
701 "available_histograms".to_string(),
702 self.available_histograms(request, combined),
703 );
704 }
705 add_last_modified_if_needed(object, request, combined);
706 add_sampling_if_needed(object, combined);
707 add_analysis_outputs_if_needed(object, request, artifacts);
708 }
709
710 fn query_histogram_artifact(
711 &self,
712 request: &NetdataRequest,
713 combined: &CombinedResult,
714 context: &DisplayContext,
715 ) -> Option<Value> {
716 if let Some(histogram) = combined.histogram.as_ref() {
717 return Some(self.build_histogram(
718 context,
719 histogram,
720 combined.facets.get(&histogram.field),
721 ));
722 }
723 let histogram_field = request.histogram.as_ref()?;
724 if request.data_only && !request.delta {
725 return None;
726 }
727 let query = request.to_explorer_query(
728 combined.matched_files,
729 None,
730 NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
731 );
732 let histogram = empty_histogram_for_query(histogram_field.as_bytes(), &query);
733 Some(self.build_histogram(
734 context,
735 &histogram,
736 combined.facets.get(histogram.field.as_slice()),
737 ))
738 }
739
740 fn add_full_query_response_metadata(
741 &self,
742 object: &mut Map<String, Value>,
743 request: &NetdataRequest,
744 combined: &CombinedResult,
745 artifacts: &QueryResponseArtifacts,
746 ) {
747 object.insert("message".to_string(), artifacts.message.clone());
748 object.insert("update_every".to_string(), Value::from(1));
749 object.insert("help".to_string(), Value::Null);
750 object.insert(
751 "accepted_params".to_string(),
752 self.accepted_params_from_fields(&artifacts.reportable_facet_field_names),
753 );
754 object.insert("default_sort_column".to_string(), Value::from("timestamp"));
755 object.insert("default_charts".to_string(), Value::Array(Vec::new()));
756 object.insert(
757 "available_histograms".to_string(),
758 self.available_histograms(request, combined),
759 );
760 }
761
762 fn build_columns(
763 &self,
764 request: &NetdataRequest,
765 reportable_facet_fields: &[Vec<u8>],
766 rows: &[LocatedRow],
767 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
768 column_fields: &BTreeSet<String>,
769 ) -> Columns {
770 let mut order = vec!["timestamp".to_string(), "rowOptions".to_string()];
771 push_unique_many(&mut order, &self.config.default_view_keys);
772 push_unique_many(&mut order, &string_fields(reportable_facet_fields));
773 if let Some(histogram) = &request.histogram {
774 push_unique(&mut order, histogram);
775 }
776 for field in column_fields {
777 push_unique(&mut order, field);
778 }
779
780 for (field, values) in facets {
781 if !facet_group_is_reportable(values) {
782 continue;
783 }
784 push_unique(&mut order, &String::from_utf8_lossy(field));
785 }
786 for row in rows {
787 let fields = row_fields(row);
788 for field in fields.keys() {
789 push_unique(&mut order, field);
790 }
791 }
792
793 let mut map = Map::new();
794 for (index, key) in order.iter().enumerate() {
795 map.insert(key.clone(), column_metadata(key, index));
796 }
797 Columns { order, map }
798 }
799
800 fn build_data_rows(
801 &self,
802 context: &DisplayContext,
803 column_order: &[String],
804 rows: &[LocatedRow],
805 direction: Direction,
806 ) -> Vec<Value> {
807 let row_iter: Box<dyn Iterator<Item = &LocatedRow> + '_> = match direction {
808 Direction::Forward => Box::new(rows.iter().rev()),
809 Direction::Backward => Box::new(rows.iter()),
810 };
811 row_iter
812 .map(|located| {
813 let fields = row_fields(located);
814 let mut row = Vec::with_capacity(column_order.len());
815 for column in column_order {
816 let value = match column.as_str() {
817 "timestamp" => Value::from(located.row.realtime_usec),
818 "rowOptions" => self.profile.row_options(&fields),
819 field => first_value(&fields, field)
820 .map(|value| {
821 self.profile.field_display_value(
822 context,
823 DisplayScope::Data,
824 field,
825 value,
826 )
827 })
828 .unwrap_or(Value::Null),
829 };
830 row.push(value);
831 }
832 Value::Array(row)
833 })
834 .collect()
835 }
836
837 fn build_facets(
838 &self,
839 context: &DisplayContext,
840 requested: &[Vec<u8>],
841 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
842 ) -> Value {
843 let mut out = Vec::new();
844 for (order, field) in requested.iter().enumerate() {
845 let values = facets.get(field);
846 let field_name = String::from_utf8_lossy(field).into_owned();
847 let mut options: Vec<_> = values
848 .into_iter()
849 .flat_map(|values| values.iter())
850 .filter(|(value, count)| {
851 (!value.is_empty() && value.as_slice() != b"-")
852 || (**count == 0 && value.is_empty())
853 })
854 .map(|(value, count)| {
855 if *count == 0 && value.is_empty() {
856 return json!({
857 "id": NETDATA_EMPTY_STRING_FACET_HASH_ID,
858 "name": NETDATA_UNAVAILABLE_FIELD_LABEL,
859 "count": count,
860 });
861 }
862 json!({
863 "id": String::from_utf8_lossy(value).into_owned(),
864 "name": self.profile.facet_option_name(context, &field_name, value),
865 "count": count,
866 })
867 })
868 .collect();
869 sort_facet_options(&field_name, &mut options);
870 for (idx, option) in options.iter_mut().enumerate() {
871 if let Some(object) = option.as_object_mut() {
872 object.insert("order".to_string(), Value::from((idx + 1) as u64));
873 }
874 }
875 out.push(json!({
876 "id": field_name,
877 "name": String::from_utf8_lossy(field).into_owned(),
878 "order": order + 1,
879 "options": options,
880 }));
881 }
882 Value::Array(out)
883 }
884
885 fn build_histogram(
886 &self,
887 context: &DisplayContext,
888 histogram: &ExplorerHistogram,
889 known_values: Option<&BTreeMap<Vec<u8>, u64>>,
890 ) -> Value {
891 let field = String::from_utf8_lossy(&histogram.field).into_owned();
892 let mut dimension_ids = BTreeSet::new();
893 let mut buckets = Vec::with_capacity(histogram.buckets.len());
894 for bucket in &histogram.buckets {
895 let mut values = BTreeMap::new();
896 for (value, count) in &bucket.values {
897 add_netdata_facet_count(&mut values, value, *count);
898 }
899 for value in values.keys() {
900 dimension_ids.insert(value.clone());
901 }
902 buckets.push((bucket.start_realtime_usec, values));
903 }
904 let actual_dimension_ids = dimension_ids.clone();
905 if let Some(known_values) = known_values {
906 for value in known_values.keys() {
907 if value.is_empty() || value.as_slice() == b"-" {
908 continue;
909 }
910 dimension_ids.insert(value.clone());
911 }
912 }
913 let dimension_ids: Vec<Vec<u8>> = dimension_ids.into_iter().collect();
914 let metadata = self.histogram_chart_metadata(
915 context,
916 &field,
917 &buckets,
918 &actual_dimension_ids,
919 &dimension_ids,
920 );
921 let data: Vec<Value> = buckets
922 .iter()
923 .map(|(start_realtime_usec, values)| {
924 let mut point = Vec::with_capacity(dimension_ids.len() + 1);
925 point.push(Value::from(start_realtime_usec / 1000));
926 for value in &dimension_ids {
927 let count = values
928 .get(value)
929 .copied()
930 .map(Value::from)
931 .unwrap_or_else(|| {
932 if actual_dimension_ids.contains(value) {
933 Value::from(0)
934 } else {
935 Value::Null
936 }
937 });
938 point.push(Value::Array(vec![count, Value::from(0), Value::from(0)]));
939 }
940 Value::Array(point)
941 })
942 .collect();
943
944 json!({
945 "id": field,
946 "name": field,
947 "chart": {
948 "summary": metadata.summary,
949 "totals": metadata.totals,
950 "result": {
951 "labels": metadata.result_labels,
952 "point": { "value": 0, "arp": 1, "pa": 2 },
953 "data": data,
954 },
955 "db": {
956 "tiers": 1,
957 "update_every": histogram_update_every_seconds(histogram),
958 "units": "events",
959 "dimensions": {
960 "ids": metadata.ids.clone(),
961 "names": metadata.names.clone(),
962 "units": metadata.units.clone(),
963 "sts": metadata.stats.clone(),
964 },
965 "per_tier": [{
966 "tier": 0,
967 "queries": 1,
968 "points": metadata.points,
969 "update_every": histogram_update_every_seconds(histogram),
970 }],
971 },
972 "view": {
973 "title": format!("Events Distribution by {}", String::from_utf8_lossy(&histogram.field)),
974 "update_every": histogram_update_every_seconds(histogram),
975 "after": histogram_after_seconds(histogram),
976 "before": histogram_before_seconds(histogram),
977 "units": "events",
978 "chart_type": "stackedBar",
979 "dimensions": {
980 "grouped_by": ["dimension"],
981 "ids": metadata.ids,
982 "names": metadata.names,
983 "colors": metadata.colors,
984 "units": metadata.units,
985 "sts": metadata.stats,
986 },
987 "min": metadata.min,
988 "max": metadata.max,
989 },
990 "agents": [{
991 "mg": "default",
992 "nm": "facets.histogram",
993 "now": now_unix_seconds(),
994 "ai": 0,
995 }],
996 }
997 })
998 }
999
1000 fn histogram_chart_metadata(
1001 &self,
1002 context: &DisplayContext,
1003 field: &str,
1004 buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1005 actual_dimensions: &BTreeSet<Vec<u8>>,
1006 dimensions: &[Vec<u8>],
1007 ) -> HistogramChartMetadata {
1008 let mut ids = Vec::with_capacity(dimensions.len());
1009 let mut names = Vec::with_capacity(dimensions.len());
1010 let mut colors = Vec::with_capacity(dimensions.len());
1011 let mut units = Vec::with_capacity(dimensions.len());
1012 let mut min_values = Vec::with_capacity(dimensions.len());
1013 let mut max_values = Vec::with_capacity(dimensions.len());
1014 let mut avg_values = Vec::with_capacity(dimensions.len());
1015 let mut arp_values = Vec::with_capacity(dimensions.len());
1016 let mut con_values = Vec::with_capacity(dimensions.len());
1017 let mut summary_dimensions = Vec::with_capacity(dimensions.len());
1018 let mut result_labels = vec![Value::String("time".to_string())];
1019
1020 let total: u64 = dimensions
1021 .iter()
1022 .map(|dimension| {
1023 buckets
1024 .iter()
1025 .map(|(_, values)| values.get(dimension).copied().unwrap_or(0))
1026 .sum::<u64>()
1027 })
1028 .sum();
1029
1030 let mut min = 0;
1031 let mut max = 0;
1032 let mut points = 0;
1033 for (priority, dimension) in dimensions.iter().enumerate() {
1034 let id = String::from_utf8_lossy(dimension).into_owned();
1035 let display = match self.profile.field_display_value(
1036 context,
1037 DisplayScope::Histogram,
1038 field,
1039 dimension,
1040 ) {
1041 Value::String(value) => value,
1042 other => other.to_string(),
1043 };
1044 let (dimension_min, dimension_max, dimension_sum, actual) =
1045 histogram_dimension_stats(buckets, actual_dimensions, dimension);
1046 let dimension_average = if actual && !buckets.is_empty() {
1047 dimension_sum as f64 / buckets.len() as f64
1048 } else {
1049 0.0
1050 };
1051 if actual {
1052 if points == 0 || dimension_min < min {
1053 min = dimension_min;
1054 }
1055 if dimension_max > max {
1056 max = dimension_max;
1057 }
1058 points += buckets.len() as u64;
1059 }
1060 let contribution = if total > 0 {
1061 dimension_sum as f64 * 100.0 / total as f64
1062 } else {
1063 0.0
1064 };
1065
1066 ids.push(Value::String(id.clone()));
1067 names.push(Value::String(display.clone()));
1068 colors.push(Value::Null);
1069 units.push(Value::String("events".to_string()));
1070 result_labels.push(Value::String(display.clone()));
1071 min_values.push(Value::from(dimension_min));
1072 max_values.push(Value::from(dimension_max));
1073 avg_values.push(Value::from(dimension_average));
1074 arp_values.push(Value::from(0));
1075 con_values.push(Value::from(contribution));
1076 summary_dimensions.push(json!({
1077 "id": id,
1078 "nm": display,
1079 "ds": { "sl": bool_to_u64(actual), "qr": bool_to_u64(actual) },
1080 "sts": {
1081 "min": dimension_min,
1082 "max": dimension_max,
1083 "avg": dimension_average,
1084 "con": contribution,
1085 },
1086 "pri": priority as u64,
1087 }));
1088 }
1089
1090 let stats = json!({
1091 "min": min_values,
1092 "max": max_values,
1093 "avg": avg_values,
1094 "arp": arp_values,
1095 "con": con_values,
1096 });
1097 let summary_stats = json!({
1098 "min": min,
1099 "max": max,
1100 "avg": histogram_average(total, points),
1101 "con": 100.0,
1102 });
1103 let dimensions_len = dimensions.len() as u64;
1104
1105 HistogramChartMetadata {
1106 ids,
1107 names,
1108 colors,
1109 units,
1110 stats,
1111 summary: json!({
1112 "nodes": [histogram_summary_node(dimensions_len, points, &summary_stats)],
1113 "contexts": [histogram_summary_context(dimensions_len, points, &summary_stats)],
1114 "instances": [histogram_summary_instance(dimensions_len, points, &summary_stats)],
1115 "dimensions": summary_dimensions,
1116 "labels": [],
1117 "alerts": [],
1118 }),
1119 totals: histogram_totals(dimensions_len),
1120 result_labels,
1121 min,
1122 max,
1123 points,
1124 }
1125 }
1126
1127 fn accepted_params_from_fields(&self, fields: &[String]) -> Value {
1128 NETDATA_ACCEPTED_PARAMS
1129 .iter()
1130 .copied()
1131 .chain(fields.iter().map(String::as_str))
1132 .map(|field| Value::String(field.to_string()))
1133 .collect()
1134 }
1135
1136 fn required_source_params(
1137 &self,
1138 paths: &[PathBuf],
1139 options: &NetdataFunctionRunOptions<'_>,
1140 ) -> Value {
1141 let mut all = JournalSourceSummary::default();
1142 let mut local = JournalSourceSummary::default();
1143 let mut local_namespaces = JournalSourceSummary::default();
1144 let mut local_system = JournalSourceSummary::default();
1145 let mut local_user = JournalSourceSummary::default();
1146 let mut remote = JournalSourceSummary::default();
1147 let mut other = JournalSourceSummary::default();
1148 let mut exact = BTreeMap::<String, JournalSourceSummary>::new();
1149
1150 for path in paths {
1151 let metadata = file_metadata(options, path);
1152 let source_type = metadata
1153 .as_ref()
1154 .and_then(|metadata| metadata.source_type)
1155 .unwrap_or_else(|| journal_file_source_type(path));
1156 all.add_path(path, self.config.reader_options, metadata.as_ref());
1157 if source_type & SOURCE_TYPE_LOCAL_ALL != 0 {
1158 local.add_path(path, self.config.reader_options, metadata.as_ref());
1159 }
1160 if source_type & SOURCE_TYPE_LOCAL_NAMESPACE != 0 {
1161 local_namespaces.add_path(path, self.config.reader_options, metadata.as_ref());
1162 }
1163 if source_type & SOURCE_TYPE_LOCAL_SYSTEM != 0 {
1164 local_system.add_path(path, self.config.reader_options, metadata.as_ref());
1165 }
1166 if source_type & SOURCE_TYPE_LOCAL_USER != 0 {
1167 local_user.add_path(path, self.config.reader_options, metadata.as_ref());
1168 }
1169 if source_type & SOURCE_TYPE_REMOTE_ALL != 0 {
1170 remote.add_path(path, self.config.reader_options, metadata.as_ref());
1171 }
1172 if source_type & SOURCE_TYPE_LOCAL_OTHER != 0 {
1173 other.add_path(path, self.config.reader_options, metadata.as_ref());
1174 }
1175 let source_name = metadata
1176 .as_ref()
1177 .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1178 .or_else(|| journal_file_exact_source_name(path));
1179 if let Some(source_name) = source_name {
1180 exact.entry(source_name).or_default().add_path(
1181 path,
1182 self.config.reader_options,
1183 metadata.as_ref(),
1184 );
1185 }
1186 }
1187
1188 let mut source_options = Vec::new();
1189 push_source_option(&mut source_options, "all", &all);
1190 push_source_option(&mut source_options, "all-local-logs", &local);
1191 push_source_option(
1192 &mut source_options,
1193 "all-local-namespaces",
1194 &local_namespaces,
1195 );
1196 push_source_option(&mut source_options, "all-local-system-logs", &local_system);
1197 push_source_option(&mut source_options, "all-local-user-logs", &local_user);
1198 push_source_option(&mut source_options, "all-remote-systems", &remote);
1199 push_source_option(&mut source_options, "all-uncategorized", &other);
1200 for (name, summary) in exact {
1201 push_source_option(&mut source_options, &name, &summary);
1202 }
1203
1204 json!([{
1205 "id": "__logs_sources",
1206 "name": "Journal Sources",
1207 "help": "Select the logs source to query",
1208 "type": "multiselect",
1209 "options": source_options,
1210 }])
1211 }
1212
1213 fn available_histograms(&self, request: &NetdataRequest, combined: &CombinedResult) -> Value {
1214 let mut fields = combined.reportable_facet_fields(&request.facets);
1215 if request.data_only {
1216 if let Some(histogram) = &request.histogram {
1217 push_unique(&mut fields, histogram);
1218 }
1219 }
1220 let mut sorted = fields.clone();
1221 sorted.sort_by(|left, right| netdata_reorder_key(left).cmp(&netdata_reorder_key(right)));
1222 let order_by_field: BTreeMap<String, usize> = sorted
1223 .into_iter()
1224 .enumerate()
1225 .map(|(index, field)| (field, index + 1))
1226 .collect();
1227
1228 fields
1229 .into_iter()
1230 .map(|field| {
1231 let order = order_by_field.get(&field).copied().unwrap_or(0);
1232 json!({
1233 "id": field,
1234 "name": field,
1235 "order": order,
1236 })
1237 })
1238 .collect()
1239 }
1240}
1241
1242struct HistogramChartMetadata {
1243 ids: Vec<Value>,
1244 names: Vec<Value>,
1245 colors: Vec<Value>,
1246 units: Vec<Value>,
1247 stats: Value,
1248 summary: Value,
1249 totals: Value,
1250 result_labels: Vec<Value>,
1251 min: u64,
1252 max: u64,
1253 points: u64,
1254}
1255
1256fn histogram_dimension_stats(
1257 buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1258 actual_dimensions: &BTreeSet<Vec<u8>>,
1259 dimension: &[u8],
1260) -> (u64, u64, u64, bool) {
1261 if !actual_dimensions.contains(dimension) {
1262 return (0, 0, 0, false);
1263 }
1264 let mut min = 0;
1265 let mut max = 0;
1266 let mut sum = 0;
1267 for (idx, (_, values)) in buckets.iter().enumerate() {
1268 let count = values.get(dimension).copied().unwrap_or(0);
1269 if idx == 0 || count < min {
1270 min = count;
1271 }
1272 if count > max {
1273 max = count;
1274 }
1275 sum += count;
1276 }
1277 (min, max, sum, true)
1278}
1279
1280fn histogram_average(total: u64, points: u64) -> f64 {
1281 if points == 0 {
1282 0.0
1283 } else {
1284 total as f64 / points as f64
1285 }
1286}
1287
1288fn histogram_summary_node(dimensions: u64, points: u64, stats: &Value) -> Value {
1289 let mut node = json!({
1290 "mg": "default",
1291 "nm": "facets.histogram",
1292 "ni": 0,
1293 "st": { "ai": 0, "code": 200, "msg": "" },
1294 });
1295 add_histogram_summary_shape(&mut node, dimensions, points, stats, true);
1296 node
1297}
1298
1299fn histogram_summary_context(dimensions: u64, points: u64, stats: &Value) -> Value {
1300 let mut context = json!({ "id": "facets.histogram" });
1301 add_histogram_summary_shape(&mut context, dimensions, points, stats, true);
1302 context
1303}
1304
1305fn histogram_summary_instance(dimensions: u64, points: u64, stats: &Value) -> Value {
1306 let mut instance = json!({ "id": "facets.histogram", "ni": 0 });
1307 add_histogram_summary_shape(&mut instance, dimensions, points, stats, false);
1308 instance
1309}
1310
1311fn add_histogram_summary_shape(
1312 object: &mut Value,
1313 dimensions: u64,
1314 points: u64,
1315 stats: &Value,
1316 include_instances: bool,
1317) {
1318 let Some(map) = object.as_object_mut() else {
1319 return;
1320 };
1321 if dimensions > 0 {
1322 if include_instances {
1323 map.insert("is".to_string(), json!({ "sl": 1, "qr": 1 }));
1324 }
1325 map.insert(
1326 "ds".to_string(),
1327 json!({ "sl": dimensions, "qr": dimensions }),
1328 );
1329 }
1330 if points > 0 {
1331 map.insert("sts".to_string(), stats.clone());
1332 }
1333}
1334
1335fn histogram_totals(dimensions: u64) -> Value {
1336 let mut totals = json!({ "nodes": { "sl": 1, "qr": 1 } });
1337 if dimensions > 0 {
1338 if let Some(map) = totals.as_object_mut() {
1339 map.insert("contexts".to_string(), json!({ "sl": 1, "qr": 1 }));
1340 map.insert("instances".to_string(), json!({ "sl": 1, "qr": 1 }));
1341 map.insert(
1342 "dimensions".to_string(),
1343 json!({ "sl": dimensions, "qr": dimensions }),
1344 );
1345 }
1346 }
1347 totals
1348}
1349
1350fn bool_to_u64(value: bool) -> u64 {
1351 if value { 1 } else { 0 }
1352}
1353
1354fn histogram_after_seconds(histogram: &ExplorerHistogram) -> u64 {
1355 histogram
1356 .buckets
1357 .first()
1358 .map(|bucket| bucket.start_realtime_usec / 1_000_000)
1359 .unwrap_or(0)
1360}
1361
1362fn histogram_before_seconds(histogram: &ExplorerHistogram) -> u64 {
1363 histogram
1364 .buckets
1365 .last()
1366 .map(|bucket| bucket.end_realtime_usec / 1_000_000)
1367 .unwrap_or(0)
1368}
1369
1370fn now_unix_seconds() -> u64 {
1371 SystemTime::now()
1372 .duration_since(UNIX_EPOCH)
1373 .unwrap_or_default()
1374 .as_secs()
1375}
1376
1377#[derive(Debug, Clone)]
1378struct NetdataRequest {
1379 info: bool,
1380 echo: Value,
1381 after_realtime_usec: Option<u64>,
1382 before_realtime_usec: Option<u64>,
1383 if_modified_since_usec: u64,
1384 anchor: ExplorerAnchor,
1385 direction: Direction,
1386 limit: usize,
1387 data_only: bool,
1388 delta: bool,
1389 tail: bool,
1390 sampling: u64,
1391 source_type: u64,
1392 exact_sources: Vec<String>,
1393 filters: Vec<ExplorerFilter>,
1394 facets: Vec<Vec<u8>>,
1395 histogram: Option<String>,
1396 fts_terms: Vec<ExplorerFtsPattern>,
1397 fts_patterns: Vec<Vec<u8>>,
1398 fts_negative_patterns: Vec<Vec<u8>>,
1399}
1400
1401impl NetdataRequest {
1402 fn parse(value: &Value, config: &NetdataFunctionConfig) -> Result<Self> {
1403 let object = value.as_object().ok_or_else(|| {
1404 SdkError::InvalidPath("Netdata function request must be a JSON object".to_string())
1405 })?;
1406 let now_seconds = unix_now_seconds();
1407 let info = get_bool(object, "info").unwrap_or(false);
1408 let after = get_i64(object, "after");
1409 let before = get_i64(object, "before");
1410 let (after_realtime_usec, before_realtime_usec) =
1411 normalize_time_window(now_seconds, after, before);
1412 let direction = request_direction(object);
1413 let if_modified_since_usec = get_u64(object, "if_modified_since").unwrap_or_default();
1414 let data_only = get_bool(object, "data_only").unwrap_or(false);
1415 let delta = request_delta(data_only, object);
1416 let tail = request_tail(data_only, if_modified_since_usec, object);
1417 let sampling = get_u64(object, "sampling").unwrap_or(DEFAULT_ITEMS_SAMPLING);
1418 let (anchor, direction) = request_anchor_and_direction(
1419 object,
1420 tail,
1421 direction,
1422 after_realtime_usec,
1423 before_realtime_usec,
1424 );
1425 let requested_limit = request_limit(object);
1426 let limit = requested_limit.max(2);
1427 let requested_facets = parse_string_array(object.get("facets"));
1428 let facets = request_facets(&requested_facets, config);
1429 let requested_histogram = request_histogram(object);
1430 let histogram = request_histogram_or_default(&requested_histogram, config);
1431 let requested_query = request_query(object);
1432 let (fts_terms, fts_patterns, fts_negative_patterns) = requested_query
1433 .as_deref()
1434 .map(parse_fts_query_patterns)
1435 .unwrap_or_default();
1436 let source_selection = parse_source_selection(object.get("selections"));
1437 let filters = parse_filters(object.get("selections"));
1438
1439 let echo_input = RequestEchoInput {
1440 info,
1441 after_realtime_usec,
1442 before_realtime_usec,
1443 if_modified_since_usec,
1444 anchor,
1445 direction,
1446 limit: requested_limit,
1447 data_only,
1448 delta,
1449 tail,
1450 sampling,
1451 source_type: source_selection.source_type,
1452 requested_facets: requested_facets.as_deref(),
1453 selections: object.get("selections"),
1454 histogram: requested_histogram.as_deref(),
1455 query: requested_query.as_deref(),
1456 };
1457 let echo = normalized_request_echo(&echo_input);
1458
1459 Ok(Self {
1460 info,
1461 echo,
1462 after_realtime_usec,
1463 before_realtime_usec,
1464 if_modified_since_usec,
1465 anchor,
1466 direction,
1467 limit,
1468 data_only,
1469 delta,
1470 tail,
1471 sampling,
1472 source_type: source_selection.source_type,
1473 exact_sources: source_selection.exact_sources,
1474 filters,
1475 facets,
1476 histogram,
1477 fts_terms,
1478 fts_patterns,
1479 fts_negative_patterns,
1480 })
1481 }
1482
1483 fn matches_source(&self, path: &Path, metadata: Option<&NetdataJournalFileMetadata>) -> bool {
1484 if self.source_type == SOURCE_TYPE_ALL && self.exact_sources.is_empty() {
1485 return true;
1486 }
1487 if self.source_type & SOURCE_TYPE_ALL != 0 {
1488 return true;
1489 }
1490 let file_source_type = metadata
1491 .and_then(|metadata| metadata.source_type)
1492 .unwrap_or_else(|| journal_file_source_type(path));
1493 if file_source_type & self.source_type != 0 {
1494 return true;
1495 }
1496 if self.exact_sources.is_empty() {
1497 return false;
1498 }
1499 let source_name = metadata
1500 .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1501 .or_else(|| journal_file_exact_source_name(path));
1502 self.exact_sources
1503 .iter()
1504 .any(|source| source_name.as_deref() == Some(source.as_str()))
1505 }
1506
1507 fn to_explorer_query(
1508 &self,
1509 matched_files: u64,
1510 file_header: Option<FileHeader>,
1511 realtime_slack_usec: u64,
1512 ) -> ExplorerQuery {
1513 let analysis_enabled = !self.data_only || self.delta;
1514 let tail_anchor = self.tail && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1515 let backward_page_anchor = self.data_only
1516 && !tail_anchor
1517 && self.direction == Direction::Backward
1518 && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1519 let after_realtime_usec = if tail_anchor {
1520 tail_after_realtime_bound(self.after_realtime_usec, self.anchor)
1521 } else {
1522 self.after_realtime_usec
1523 };
1524 let before_realtime_usec = if backward_page_anchor {
1525 before_realtime_bound_excluding_anchor(self.before_realtime_usec, self.anchor)
1526 } else {
1527 self.before_realtime_usec
1528 };
1529 let anchor = if tail_anchor || backward_page_anchor {
1530 ExplorerAnchor::Auto
1531 } else {
1532 self.anchor
1533 };
1534 let sampling = (analysis_enabled
1535 && self.sampling != 0
1536 && matched_files != 0
1537 && after_realtime_usec.is_some()
1538 && before_realtime_usec.is_some())
1539 .then(|| {
1540 let header = file_header.unwrap_or(FileHeader {
1541 signature: [0; 8],
1542 compatible_flags: 0,
1543 incompatible_flags: 0,
1544 state: 0,
1545 header_size: 0,
1546 n_entries: 0,
1547 head_entry_realtime: 0,
1548 tail_entry_realtime: 0,
1549 head_entry_seqnum: 0,
1550 tail_entry_seqnum: 0,
1551 tail_entry_boot_id: [0; 16],
1552 seqnum_id: [0; 16],
1553 });
1554 let messages_in_file = header
1555 .tail_entry_seqnum
1556 .checked_sub(header.head_entry_seqnum)
1557 .and_then(|span| span.checked_add(1))
1558 .filter(|_| header.head_entry_seqnum != 0 && header.tail_entry_seqnum != 0)
1559 .unwrap_or(header.n_entries);
1560 ExplorerSampling {
1561 budget: self.sampling,
1562 matched_files,
1563 file_head_realtime_usec: header.head_entry_realtime,
1564 file_tail_realtime_usec: header.tail_entry_realtime,
1565 file_head_seqnum: header.head_entry_seqnum,
1566 file_tail_seqnum: header.tail_entry_seqnum,
1567 file_entries: messages_in_file,
1568 }
1569 });
1570 ExplorerQuery {
1571 after_realtime_usec,
1572 before_realtime_usec,
1573 anchor,
1574 direction: self.direction,
1575 limit: self.limit,
1576 filters: self.filters.clone(),
1577 facets: analysis_enabled
1578 .then(|| self.facets.clone())
1579 .unwrap_or_default(),
1580 histogram: analysis_enabled
1581 .then(|| {
1582 self.histogram
1583 .as_ref()
1584 .map(|field| field.as_bytes().to_vec())
1585 })
1586 .flatten(),
1587 histogram_after_realtime_usec: self.after_realtime_usec,
1588 histogram_before_realtime_usec: self.before_realtime_usec,
1589 histogram_target_buckets: DEFAULT_HISTOGRAM_BUCKETS,
1590 fts_terms: self.fts_terms.clone(),
1591 fts_patterns: self.fts_patterns.clone(),
1592 fts_negative_patterns: self.fts_negative_patterns.clone(),
1593 field_mode: ExplorerFieldMode::FirstValue,
1594 exclude_facet_field_filters: self.distinct_filter_fields() > 1,
1595 use_source_realtime: true,
1596 realtime_slack_usec: normalize_journal_vs_realtime_delta_usec(realtime_slack_usec),
1597 stop_when_rows_full: self.data_only && !tail_anchor,
1598 stop_when_rows_full_check_every: DATA_ONLY_CHECK_EVERY_ROWS,
1599 sampling,
1600 debug_collect_column_fields_by_row_traversal: false,
1601 }
1602 }
1603
1604 fn file_query(
1605 &self,
1606 matched_files: usize,
1607 file_header: FileHeader,
1608 order: &JournalFileOrderInfo,
1609 ) -> ExplorerQuery {
1610 let mut query = self.to_explorer_query(
1611 matched_files as u64,
1612 Some(file_header),
1613 order.journal_vs_realtime_delta_usec,
1614 );
1615 if self.data_only && self.delta {
1616 query.stop_when_rows_full = false;
1617 }
1618 query
1619 }
1620
1621 fn unfiltered_vocabulary(&self) -> Self {
1622 let mut request = self.clone();
1623 request.filters.clear();
1624 request.histogram = None;
1625 request.limit = 0;
1626 request.fts_terms.clear();
1627 request.fts_patterns.clear();
1628 request.fts_negative_patterns.clear();
1629 request
1630 }
1631
1632 fn distinct_filter_fields(&self) -> usize {
1633 self.filters
1634 .iter()
1635 .map(|filter| filter.field.as_slice())
1636 .collect::<HashSet<_>>()
1637 .len()
1638 }
1639}
1640
1641#[derive(Debug, Clone)]
1642struct LocatedRow {
1643 file_path: PathBuf,
1644 row: ExplorerRow,
1645}
1646
1647#[derive(Debug)]
1648struct NetdataRealtimeAdjuster {
1649 direction: Direction,
1650 last_realtime_from: u64,
1651 last_realtime_to: u64,
1652}
1653
1654impl NetdataRealtimeAdjuster {
1655 fn new(direction: Direction) -> Self {
1656 Self {
1657 direction,
1658 last_realtime_from: 0,
1659 last_realtime_to: 0,
1660 }
1661 }
1662
1663 fn adjust(&mut self, realtime_usec: u64) -> u64 {
1664 match self.direction {
1665 Direction::Backward => {
1666 if realtime_usec >= self.last_realtime_from
1667 && realtime_usec <= self.last_realtime_to
1668 {
1669 self.last_realtime_from = self.last_realtime_from.saturating_sub(1);
1670 self.last_realtime_from
1671 } else {
1672 self.last_realtime_from = realtime_usec;
1673 self.last_realtime_to = realtime_usec;
1674 realtime_usec
1675 }
1676 }
1677 Direction::Forward => {
1678 if realtime_usec >= self.last_realtime_from
1679 && realtime_usec <= self.last_realtime_to
1680 {
1681 self.last_realtime_to = self.last_realtime_to.saturating_add(1);
1682 self.last_realtime_to
1683 } else {
1684 self.last_realtime_from = realtime_usec;
1685 self.last_realtime_to = realtime_usec;
1686 realtime_usec
1687 }
1688 }
1689 }
1690 }
1691}
1692
1693#[derive(Debug, Default)]
1694struct JournalSourceSummary {
1695 files: u64,
1696 total_size: u64,
1697 first_realtime_usec: Option<u64>,
1698 last_realtime_usec: Option<u64>,
1699}
1700
1701impl JournalSourceSummary {
1702 #[cfg(test)]
1703 fn from_paths(
1704 paths: &[PathBuf],
1705 reader_options: ReaderOptions,
1706 options: &NetdataFunctionRunOptions<'_>,
1707 ) -> Self {
1708 let mut summary = Self::default();
1709 for path in paths {
1710 let metadata = file_metadata(options, path);
1711 summary.add_path(path, reader_options, metadata.as_ref());
1712 }
1713 summary
1714 }
1715
1716 fn add_path(
1717 &mut self,
1718 path: &Path,
1719 reader_options: ReaderOptions,
1720 metadata: Option<&NetdataJournalFileMetadata>,
1721 ) {
1722 if let Ok(metadata) = std::fs::metadata(path) {
1723 self.files = self.files.saturating_add(1);
1724 self.total_size = self.total_size.saturating_add(metadata.len());
1725 }
1726 if let Some(metadata) = metadata {
1727 let metadata_first = metadata.msg_first_realtime_usec;
1728 let metadata_last = metadata.msg_last_realtime_usec;
1729 if let Some(first) = metadata_first {
1730 self.first_realtime_usec = Some(
1731 self.first_realtime_usec
1732 .map_or(first, |current| current.min(first)),
1733 );
1734 }
1735 if let Some(last) = metadata_last {
1736 self.last_realtime_usec = Some(
1737 self.last_realtime_usec
1738 .map_or(last, |current| current.max(last)),
1739 );
1740 }
1741 if metadata_first.is_some() && metadata_last.is_some() {
1742 return;
1743 }
1744 }
1745 let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
1746 return;
1747 };
1748 let header = reader.header();
1749 if header.head_entry_realtime != 0 {
1750 self.first_realtime_usec = Some(
1751 self.first_realtime_usec
1752 .map_or(header.head_entry_realtime, |current| {
1753 current.min(header.head_entry_realtime)
1754 }),
1755 );
1756 }
1757 if header.tail_entry_realtime != 0 {
1758 self.last_realtime_usec = Some(
1759 self.last_realtime_usec
1760 .map_or(header.tail_entry_realtime, |current| {
1761 current.max(header.tail_entry_realtime)
1762 }),
1763 );
1764 }
1765 }
1766
1767 fn info(&self) -> String {
1768 let coverage = match (self.first_realtime_usec, self.last_realtime_usec) {
1769 (Some(first), Some(last)) if last > first && (last - first) >= 1_000_000 => {
1770 human_duration_seconds((last - first) / 1_000_000)
1771 }
1772 _ => "off".to_string(),
1773 };
1774 let last_entry = self
1775 .last_realtime_usec
1776 .and_then(|usec| DateTime::<Utc>::from_timestamp((usec / 1_000_000) as i64, 0))
1777 .map(|datetime| datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1778 .unwrap_or_else(|| "unknown".to_string());
1779 format!(
1780 "{} files, total size {}, covering {}, last entry at {}",
1781 self.files,
1782 human_binary_size(self.total_size),
1783 coverage,
1784 last_entry
1785 )
1786 }
1787}
1788
1789fn push_source_option(target: &mut Vec<Value>, id: &str, summary: &JournalSourceSummary) {
1790 if summary.files == 0 {
1791 return;
1792 }
1793 target.push(json!({
1794 "id": id,
1795 "name": id,
1796 "info": summary.info(),
1797 "pill": human_binary_size(summary.total_size),
1798 }));
1799}
1800
1801fn expand_located_row_payloads(
1802 located: &mut LocatedRow,
1803 reader_options: ReaderOptions,
1804) -> Result<()> {
1805 let mut reader = FileReader::open_with_options(&located.file_path, reader_options)?;
1806 reader.seek_cursor(&located.row.cursor)?;
1807 if !reader.test_cursor(&located.row.cursor)? {
1808 return Err(SdkError::InvalidCursor(format!(
1809 "selected row cursor is no longer available: {}",
1810 located.row.cursor
1811 )));
1812 }
1813 reader.collect_entry_payloads(&mut located.row.payloads)
1814}
1815
1816#[derive(Debug, Default)]
1817struct CombinedResult {
1818 rows: Vec<LocatedRow>,
1819 facets: BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1820 histogram: Option<ExplorerHistogram>,
1821 column_fields: BTreeSet<String>,
1822 stats: ExplorerStats,
1823 page_counters: Option<NetdataPageCounters>,
1824 matched_files: u64,
1825 matched_paths: Vec<PathBuf>,
1826 skipped_files: u64,
1827 file_errors: Vec<String>,
1828 partial: bool,
1829 timed_out: bool,
1830 cancelled: bool,
1831 sampling_enabled: bool,
1832}
1833
1834#[derive(Clone, Copy, Debug, Default)]
1835struct NetdataPageCounters {
1836 matched: u64,
1837 before: u64,
1838 after: u64,
1839}
1840
1841#[derive(Clone, Copy)]
1842struct ProgressContext {
1843 current_file: usize,
1844 total_files: usize,
1845 started: Instant,
1846}
1847
1848#[derive(Debug)]
1849enum NetdataPageHeap {
1850 Backward(BinaryHeap<Reverse<u64>>),
1851 Forward(BinaryHeap<u64>),
1852}
1853
1854#[derive(Debug)]
1855struct NetdataPageWindow {
1856 direction: Direction,
1857 anchor_start_usec: Option<u64>,
1858 anchor_stop_usec: Option<u64>,
1859 limit: usize,
1860 heap: NetdataPageHeap,
1861 oldest_retained_usec: Option<u64>,
1862 newest_retained_usec: Option<u64>,
1863 matched: u64,
1864 skips_before: u64,
1865 skips_after: u64,
1866 shifts: u64,
1867}
1868
1869impl NetdataPageWindow {
1870 fn for_request(request: &NetdataRequest) -> Self {
1871 let (anchor_start_usec, anchor_stop_usec) = match (request.tail, request.anchor) {
1872 (true, ExplorerAnchor::Realtime(anchor)) => (None, Some(anchor)),
1873 (false, ExplorerAnchor::Realtime(anchor)) => (Some(anchor), None),
1874 _ => (None, None),
1875 };
1876 let heap = match request.direction {
1877 Direction::Backward => NetdataPageHeap::Backward(BinaryHeap::new()),
1878 Direction::Forward => NetdataPageHeap::Forward(BinaryHeap::new()),
1879 };
1880 Self {
1881 direction: request.direction,
1882 anchor_start_usec,
1883 anchor_stop_usec,
1884 limit: request.limit,
1885 heap,
1886 oldest_retained_usec: None,
1887 newest_retained_usec: None,
1888 matched: 0,
1889 skips_before: 0,
1890 skips_after: if request.tail
1891 && request.delta
1892 && matches!(request.anchor, ExplorerAnchor::Realtime(_))
1893 {
1894 1
1895 } else {
1896 0
1897 },
1898 shifts: 0,
1899 }
1900 }
1901
1902 fn candidate_to_keep(&self, realtime_usec: u64) -> bool {
1903 if self.limit == 0 || !self.entry_within_anchor_readonly(realtime_usec) {
1904 return false;
1905 }
1906 if self.retained_len() < self.limit {
1907 return true;
1908 }
1909 self.oldest_retained_usec
1910 .zip(self.newest_retained_usec)
1911 .is_some_and(|(oldest, newest)| realtime_usec >= oldest && realtime_usec <= newest)
1912 }
1913
1914 fn observe(&mut self, realtime_usec: u64) {
1915 if !self.entry_within_anchor(realtime_usec) || self.limit == 0 {
1916 return;
1917 }
1918 self.matched = self.matched.saturating_add(1);
1919 match (&mut self.heap, self.direction) {
1920 (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1921 if heap.len() < self.limit {
1922 heap.push(Reverse(realtime_usec));
1923 self.add_retained_bound(realtime_usec);
1924 return;
1925 }
1926 let Some(Reverse(oldest)) = heap.peek().copied() else {
1927 heap.push(Reverse(realtime_usec));
1928 self.refresh_retained_bounds();
1929 return;
1930 };
1931 if realtime_usec < oldest {
1932 self.skips_after = self.skips_after.saturating_add(1);
1933 return;
1934 }
1935 heap.pop();
1936 heap.push(Reverse(realtime_usec));
1937 self.refresh_retained_bounds();
1938 self.shifts = self.shifts.saturating_add(1);
1939 }
1940 (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1941 if heap.len() < self.limit {
1942 heap.push(realtime_usec);
1943 self.add_retained_bound(realtime_usec);
1944 return;
1945 }
1946 let Some(newest) = heap.peek().copied() else {
1947 heap.push(realtime_usec);
1948 self.refresh_retained_bounds();
1949 return;
1950 };
1951 if realtime_usec > newest {
1952 self.skips_before = self.skips_before.saturating_add(1);
1953 return;
1954 }
1955 heap.pop();
1956 heap.push(realtime_usec);
1957 self.refresh_retained_bounds();
1958 self.shifts = self.shifts.saturating_add(1);
1959 }
1960 _ => {}
1961 }
1962 }
1963
1964 fn retained_len(&self) -> usize {
1965 match &self.heap {
1966 NetdataPageHeap::Backward(heap) => heap.len(),
1967 NetdataPageHeap::Forward(heap) => heap.len(),
1968 }
1969 }
1970
1971 fn add_retained_bound(&mut self, realtime_usec: u64) {
1972 self.oldest_retained_usec = Some(
1973 self.oldest_retained_usec
1974 .map_or(realtime_usec, |current| current.min(realtime_usec)),
1975 );
1976 self.newest_retained_usec = Some(
1977 self.newest_retained_usec
1978 .map_or(realtime_usec, |current| current.max(realtime_usec)),
1979 );
1980 }
1981
1982 fn refresh_retained_bounds(&mut self) {
1983 let (oldest, newest) = match &self.heap {
1984 NetdataPageHeap::Backward(heap) => heap
1985 .iter()
1986 .map(|Reverse(usec)| *usec)
1987 .fold((None, None), retained_bounds_fold),
1988 NetdataPageHeap::Forward(heap) => heap
1989 .iter()
1990 .copied()
1991 .fold((None, None), retained_bounds_fold),
1992 };
1993 self.oldest_retained_usec = oldest;
1994 self.newest_retained_usec = newest;
1995 }
1996
1997 fn entry_within_anchor(&mut self, realtime_usec: u64) -> bool {
1998 match self.direction {
1999 Direction::Backward => {
2000 if self
2001 .anchor_start_usec
2002 .is_some_and(|anchor| realtime_usec >= anchor)
2003 {
2004 self.skips_before = self.skips_before.saturating_add(1);
2005 return false;
2006 }
2007 if self
2008 .anchor_stop_usec
2009 .is_some_and(|anchor| realtime_usec <= anchor)
2010 {
2011 self.skips_after = self.skips_after.saturating_add(1);
2012 return false;
2013 }
2014 }
2015 Direction::Forward => {
2016 if self
2017 .anchor_start_usec
2018 .is_some_and(|anchor| realtime_usec <= anchor)
2019 {
2020 self.skips_after = self.skips_after.saturating_add(1);
2021 return false;
2022 }
2023 if self
2024 .anchor_stop_usec
2025 .is_some_and(|anchor| realtime_usec >= anchor)
2026 {
2027 self.skips_before = self.skips_before.saturating_add(1);
2028 return false;
2029 }
2030 }
2031 }
2032 true
2033 }
2034
2035 fn entry_within_anchor_readonly(&self, realtime_usec: u64) -> bool {
2036 match self.direction {
2037 Direction::Backward => {
2038 if self
2039 .anchor_start_usec
2040 .is_some_and(|anchor| realtime_usec >= anchor)
2041 {
2042 return false;
2043 }
2044 if self
2045 .anchor_stop_usec
2046 .is_some_and(|anchor| realtime_usec <= anchor)
2047 {
2048 return false;
2049 }
2050 }
2051 Direction::Forward => {
2052 if self
2053 .anchor_start_usec
2054 .is_some_and(|anchor| realtime_usec <= anchor)
2055 {
2056 return false;
2057 }
2058 if self
2059 .anchor_stop_usec
2060 .is_some_and(|anchor| realtime_usec >= anchor)
2061 {
2062 return false;
2063 }
2064 }
2065 }
2066 true
2067 }
2068
2069 fn counters(&self) -> NetdataPageCounters {
2070 NetdataPageCounters {
2071 matched: self.matched,
2072 before: self.skips_before,
2073 after: self.skips_after.saturating_add(self.shifts),
2074 }
2075 }
2076
2077 fn can_stop_delta_file(&self, realtime_usec: u64, slack_usec: u64) -> bool {
2078 if self.limit == 0 {
2079 return false;
2080 }
2081 match (&self.heap, self.direction) {
2082 (NetdataPageHeap::Backward(heap), Direction::Backward) => {
2083 if heap.len() < self.limit {
2084 return false;
2085 }
2086 heap.peek().is_some_and(|Reverse(oldest)| {
2087 realtime_usec < oldest.saturating_sub(slack_usec)
2088 })
2089 }
2090 (NetdataPageHeap::Forward(heap), Direction::Forward) => {
2091 if heap.len() < self.limit {
2092 return false;
2093 }
2094 heap.peek()
2095 .is_some_and(|newest| realtime_usec > newest.saturating_add(slack_usec))
2096 }
2097 _ => false,
2098 }
2099 }
2100}
2101
2102fn retained_bounds_fold(
2103 (oldest, newest): (Option<u64>, Option<u64>),
2104 realtime_usec: u64,
2105) -> (Option<u64>, Option<u64>) {
2106 (
2107 Some(oldest.map_or(realtime_usec, |current| current.min(realtime_usec))),
2108 Some(newest.map_or(realtime_usec, |current| current.max(realtime_usec))),
2109 )
2110}
2111
2112impl CombinedResult {
2113 fn merge(
2114 &mut self,
2115 path: &Path,
2116 result: ExplorerResult,
2117 direction: Direction,
2118 limit: usize,
2119 ) -> Result<()> {
2120 let ExplorerResult {
2121 rows,
2122 facets,
2123 histogram,
2124 stats,
2125 column_fields,
2126 ..
2127 } = result;
2128
2129 if let Some(histogram) = histogram {
2130 merge_histogram(&mut self.histogram, histogram)?;
2131 }
2132
2133 self.merge_stats(stats);
2134 for row in rows {
2135 self.rows.push(LocatedRow {
2136 file_path: path.to_path_buf(),
2137 row,
2138 });
2139 }
2140 for field in column_fields {
2141 if let Ok(field) = String::from_utf8(field) {
2142 self.column_fields.insert(field);
2143 }
2144 }
2145 for (field, values) in facets {
2146 let target = self.facets.entry(field).or_default();
2147 for (value, count) in values {
2148 add_netdata_facet_count(target, &value, count);
2149 }
2150 }
2151 self.sort_and_limit(direction, limit);
2152 Ok(())
2153 }
2154
2155 fn add_column_fields<I>(&mut self, fields: I)
2156 where
2157 I: IntoIterator<Item = String>,
2158 {
2159 self.column_fields.extend(fields);
2160 }
2161
2162 fn sort_and_limit(&mut self, direction: Direction, limit: usize) {
2163 match direction {
2164 Direction::Forward => self.rows.sort_by_key(|row| row.row.realtime_usec),
2165 Direction::Backward => self
2166 .rows
2167 .sort_by(|left, right| right.row.realtime_usec.cmp(&left.row.realtime_usec)),
2168 }
2169 make_row_timestamps_unique(&mut self.rows, direction);
2170 if self.rows.len() > limit {
2171 self.rows.truncate(limit);
2172 }
2173 self.stats.rows_returned = self.rows.len() as u64;
2174 }
2175
2176 fn expand_row_payloads(&mut self, reader_options: ReaderOptions) {
2177 if self.rows.is_empty() {
2178 self.stats.rows_returned = 0;
2179 return;
2180 }
2181
2182 let mut rows = Vec::with_capacity(self.rows.len());
2183 for mut located in self.rows.drain(..) {
2184 if !located.row.payloads.is_empty() {
2185 rows.push(located);
2186 continue;
2187 }
2188 match expand_located_row_payloads(&mut located, reader_options) {
2189 Ok(()) => {
2190 self.stats.returned_row_expansions =
2191 self.stats.returned_row_expansions.saturating_add(1);
2192 rows.push(located);
2193 }
2194 Err(err) => {
2195 self.partial = true;
2196 self.file_errors.push(format!(
2197 "{} cursor {}: {err}",
2198 located.file_path.display(),
2199 located.row.cursor
2200 ));
2201 }
2202 }
2203 }
2204 self.rows = rows;
2205 self.stats.rows_returned = self.rows.len() as u64;
2206 }
2207
2208 fn merge_stats(&mut self, stats: ExplorerStats) {
2209 self.stats.rows_examined = self.stats.rows_examined.saturating_add(stats.rows_examined);
2210 self.stats.rows_matched = self.stats.rows_matched.saturating_add(stats.rows_matched);
2211 self.stats.facet_rows_matched = self
2212 .stats
2213 .facet_rows_matched
2214 .saturating_add(stats.facet_rows_matched);
2215 self.stats.rows_returned = self.stats.rows_returned.saturating_add(stats.rows_returned);
2216 self.stats.rows_unsampled = self
2217 .stats
2218 .rows_unsampled
2219 .saturating_add(stats.rows_unsampled);
2220 self.stats.rows_estimated = self
2221 .stats
2222 .rows_estimated
2223 .saturating_add(stats.rows_estimated);
2224 self.stats.sampling_sampled = self
2225 .stats
2226 .sampling_sampled
2227 .saturating_add(stats.sampling_sampled);
2228 self.stats.sampling_unsampled = self
2229 .stats
2230 .sampling_unsampled
2231 .saturating_add(stats.sampling_unsampled);
2232 self.stats.sampling_estimated = self
2233 .stats
2234 .sampling_estimated
2235 .saturating_add(stats.sampling_estimated);
2236 if stats.last_realtime_usec > self.stats.last_realtime_usec {
2237 self.stats.last_realtime_usec = stats.last_realtime_usec;
2238 }
2239 if stats.max_source_realtime_delta_usec > self.stats.max_source_realtime_delta_usec {
2240 self.stats.max_source_realtime_delta_usec = stats.max_source_realtime_delta_usec;
2241 }
2242 self.stats.data_refs_seen = self
2243 .stats
2244 .data_refs_seen
2245 .saturating_add(stats.data_refs_seen);
2246 self.stats.data_refs_skipped = self
2247 .stats
2248 .data_refs_skipped
2249 .saturating_add(stats.data_refs_skipped);
2250 self.stats.data_payloads_loaded = self
2251 .stats
2252 .data_payloads_loaded
2253 .saturating_add(stats.data_payloads_loaded);
2254 self.stats.data_objects_classified = self
2255 .stats
2256 .data_objects_classified
2257 .saturating_add(stats.data_objects_classified);
2258 self.stats.data_cache_hits = self
2259 .stats
2260 .data_cache_hits
2261 .saturating_add(stats.data_cache_hits);
2262 self.stats.data_cache_misses = self
2263 .stats
2264 .data_cache_misses
2265 .saturating_add(stats.data_cache_misses);
2266 self.stats.payloads_decompressed = self
2267 .stats
2268 .payloads_decompressed
2269 .saturating_add(stats.payloads_decompressed);
2270 self.stats.fts_scans = self.stats.fts_scans.saturating_add(stats.fts_scans);
2271 self.stats.facet_updates = self.stats.facet_updates.saturating_add(stats.facet_updates);
2272 self.stats.histogram_updates = self
2273 .stats
2274 .histogram_updates
2275 .saturating_add(stats.histogram_updates);
2276 self.stats.returned_row_expansions = self
2277 .stats
2278 .returned_row_expansions
2279 .saturating_add(stats.returned_row_expansions);
2280 self.stats.early_stop_opportunities = self
2281 .stats
2282 .early_stop_opportunities
2283 .saturating_add(stats.early_stop_opportunities);
2284 self.stats.early_stops = self.stats.early_stops.saturating_add(stats.early_stops);
2285 }
2286
2287 fn add_zero_count_facet_values(
2288 &mut self,
2289 vocabulary: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
2290 ) {
2291 for (field, values) in vocabulary {
2292 let target = self.facets.entry(field.clone()).or_default();
2293 for value in values.keys() {
2294 add_netdata_facet_count(target, value, 0);
2295 }
2296 }
2297 }
2298
2299 fn add_zero_count_facet_values_from_files(
2300 &mut self,
2301 fields: &[Vec<u8>],
2302 reader_options: ReaderOptions,
2303 ) {
2304 for path in &self.matched_paths {
2305 let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2306 continue;
2307 };
2308 for field in fields {
2309 let Ok(field_name) = std::str::from_utf8(field) else {
2310 continue;
2311 };
2312 let Ok(values) = reader.query_unique(field_name) else {
2313 continue;
2314 };
2315 if values.is_empty() {
2316 continue;
2317 }
2318 let target = self.facets.entry(field.clone()).or_default();
2319 for value in values {
2320 add_netdata_facet_count(target, &value, 0);
2321 }
2322 }
2323 }
2324 }
2325
2326 fn add_zero_count_selected_filter_values(&mut self, request: &NetdataRequest) {
2327 let mut report_fields: HashSet<Vec<u8>> = request.facets.iter().cloned().collect();
2328 if let Some(histogram) = &request.histogram {
2329 report_fields.insert(histogram.as_bytes().to_vec());
2330 }
2331 for filter in &request.filters {
2332 if !report_fields.contains(&filter.field) {
2333 continue;
2334 }
2335 let target = self.facets.entry(filter.field.clone()).or_default();
2336 for value in &filter.values {
2337 add_netdata_facet_count(target, value, 0);
2338 }
2339 }
2340 }
2341
2342 fn reportable_facet_fields(&self, requested: &[Vec<u8>]) -> Vec<String> {
2343 string_fields(&self.reportable_facet_fields_bytes(requested))
2344 }
2345
2346 fn reportable_facet_fields_bytes(&self, requested: &[Vec<u8>]) -> Vec<Vec<u8>> {
2347 let mut fields = Vec::new();
2348 for field in requested {
2349 if !fields.contains(field) {
2350 fields.push(field.clone());
2351 }
2352 }
2353 fields
2354 }
2355}
2356
2357fn netdata_function_error(status: u64, message: &str) -> Value {
2358 json!({
2359 "status": status,
2360 "errorMessage": message,
2361 })
2362}
2363
2364fn query_message(timed_out: bool, stats: &ExplorerStats) -> Value {
2365 if !timed_out && stats.rows_unsampled == 0 && stats.rows_estimated == 0 {
2366 return Value::String("OK".to_string());
2367 }
2368
2369 let total = stats
2370 .rows_examined
2371 .saturating_add(stats.rows_unsampled)
2372 .saturating_add(stats.rows_estimated)
2373 .max(1);
2374 let real_percent = stats.rows_examined as f64 * 100.0 / total as f64;
2375 let unsampled_percent = stats.rows_unsampled as f64 * 100.0 / total as f64;
2376 let estimated_percent = stats.rows_estimated as f64 * 100.0 / total as f64;
2377
2378 let mut title = String::new();
2379 let mut description = String::new();
2380 let mut status = "notice";
2381 if timed_out {
2382 title.push_str("Query timed-out, incomplete data. ");
2383 description.push_str(
2384 "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ",
2385 );
2386 status = "warning";
2387 }
2388 if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2389 title.push_str(&format!("{real_percent:.2}% real data"));
2390 description.push_str(&format!(
2391 "ACTUAL DATA: The filters counters reflect {real_percent:.2}% of the data. "
2392 ));
2393 }
2394 if stats.rows_unsampled != 0 {
2395 title.push_str(&format!(", {unsampled_percent:.2}% unsampled"));
2396 description.push_str(&format!(
2397 "UNSAMPLED DATA: {unsampled_percent:.2}% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
2398 ));
2399 }
2400 if stats.rows_estimated != 0 {
2401 title.push_str(&format!(", {estimated_percent:.2}% estimated"));
2402 description.push_str(&format!(
2403 "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by {estimated_percent:.2}%. "
2404 ));
2405 }
2406
2407 json!({
2408 "title": title,
2409 "status": status,
2410 "description": description,
2411 })
2412}
2413
2414fn merged_progress_stats(completed: &ExplorerStats, current: &ExplorerStats) -> ExplorerStats {
2415 let mut stats = completed.clone();
2416 stats.rows_examined = stats.rows_examined.saturating_add(current.rows_examined);
2417 stats.rows_matched = stats.rows_matched.saturating_add(current.rows_matched);
2418 stats.facet_rows_matched = stats
2419 .facet_rows_matched
2420 .saturating_add(current.facet_rows_matched);
2421 stats.rows_returned = stats.rows_returned.saturating_add(current.rows_returned);
2422 stats.rows_unsampled = stats.rows_unsampled.saturating_add(current.rows_unsampled);
2423 stats.rows_estimated = stats.rows_estimated.saturating_add(current.rows_estimated);
2424 stats.sampling_sampled = stats
2425 .sampling_sampled
2426 .saturating_add(current.sampling_sampled);
2427 stats.sampling_unsampled = stats
2428 .sampling_unsampled
2429 .saturating_add(current.sampling_unsampled);
2430 stats.sampling_estimated = stats
2431 .sampling_estimated
2432 .saturating_add(current.sampling_estimated);
2433 if current.last_realtime_usec > stats.last_realtime_usec {
2434 stats.last_realtime_usec = current.last_realtime_usec;
2435 }
2436 if current.max_source_realtime_delta_usec > stats.max_source_realtime_delta_usec {
2437 stats.max_source_realtime_delta_usec = current.max_source_realtime_delta_usec;
2438 }
2439 stats.data_refs_seen = stats.data_refs_seen.saturating_add(current.data_refs_seen);
2440 stats.data_refs_skipped = stats
2441 .data_refs_skipped
2442 .saturating_add(current.data_refs_skipped);
2443 stats.data_payloads_loaded = stats
2444 .data_payloads_loaded
2445 .saturating_add(current.data_payloads_loaded);
2446 stats.data_objects_classified = stats
2447 .data_objects_classified
2448 .saturating_add(current.data_objects_classified);
2449 stats.data_cache_hits = stats
2450 .data_cache_hits
2451 .saturating_add(current.data_cache_hits);
2452 stats.data_cache_misses = stats
2453 .data_cache_misses
2454 .saturating_add(current.data_cache_misses);
2455 stats.payloads_decompressed = stats
2456 .payloads_decompressed
2457 .saturating_add(current.payloads_decompressed);
2458 stats.fts_scans = stats.fts_scans.saturating_add(current.fts_scans);
2459 stats.facet_updates = stats.facet_updates.saturating_add(current.facet_updates);
2460 stats.histogram_updates = stats
2461 .histogram_updates
2462 .saturating_add(current.histogram_updates);
2463 stats.returned_row_expansions = stats
2464 .returned_row_expansions
2465 .saturating_add(current.returned_row_expansions);
2466 stats.early_stop_opportunities = stats
2467 .early_stop_opportunities
2468 .saturating_add(current.early_stop_opportunities);
2469 stats.early_stops = stats.early_stops.saturating_add(current.early_stops);
2470 stats
2471}
2472
2473struct Columns {
2474 order: Vec<String>,
2475 map: Map<String, Value>,
2476}
2477
2478struct QueryResponseArtifacts {
2479 reportable_facet_field_names: Vec<String>,
2480 columns: Columns,
2481 data: Vec<Value>,
2482 facets: Value,
2483 histogram: Option<Value>,
2484 message: Value,
2485 items: Value,
2486}
2487
2488fn base_query_response(
2489 request: &NetdataRequest,
2490 combined: &CombinedResult,
2491 artifacts: &QueryResponseArtifacts,
2492) -> Value {
2493 json!({
2494 "_request": &request.echo,
2495 "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
2496 "_journal_files": {
2497 "matched": combined.matched_files,
2498 "skipped": combined.skipped_files,
2499 "errors": &combined.file_errors,
2500 },
2501 "status": 200,
2502 "partial": combined.partial,
2503 "type": "table",
2504 "show_ids": true,
2505 "has_history": true,
2506 "pagination": {
2507 "enabled": true,
2508 "key": "anchor",
2509 "column": "timestamp",
2510 "units": "timestamp_usec",
2511 },
2512 "columns": &artifacts.columns.map,
2513 "data": &artifacts.data,
2514 "_stats": {
2515 "sdk_explorer": &combined.stats,
2516 },
2517 "expires": if request.data_only {
2518 unix_now_seconds().saturating_add(3600)
2519 } else {
2520 0
2521 }
2522 })
2523}
2524
2525fn response_items(request: &NetdataRequest, combined: &CombinedResult, returned: u64) -> Value {
2526 let unsampled = combined.stats.rows_unsampled;
2527 let estimated = combined.stats.rows_estimated;
2528 let fallback_rows_after_returned =
2529 response_fallback_rows_after_returned(&combined.stats, returned);
2530 let page_counters = combined
2531 .page_counters
2532 .unwrap_or_else(|| NetdataPageCounters {
2533 matched: combined.stats.rows_matched,
2534 before: 0,
2535 after: fallback_rows_after_returned,
2536 });
2537 json!({
2538 "evaluated": combined.stats.rows_examined.saturating_add(unsampled).saturating_add(estimated),
2539 "matched": page_counters.matched.saturating_add(unsampled).saturating_add(estimated),
2540 "unsampled": unsampled,
2541 "estimated": estimated,
2542 "returned": returned,
2543 "max_to_return": request.limit as u64,
2544 "before": page_counters.before,
2545 "after": page_counters.after,
2546 })
2547}
2548
2549fn response_fallback_rows_after_returned(stats: &ExplorerStats, returned: u64) -> u64 {
2550 let source_rows = if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2551 stats.rows_examined
2552 } else {
2553 stats.rows_matched
2554 };
2555 source_rows.saturating_sub(returned)
2556}
2557
2558fn add_last_modified_if_needed(
2559 object: &mut Map<String, Value>,
2560 request: &NetdataRequest,
2561 combined: &CombinedResult,
2562) {
2563 if !request.data_only || request.tail {
2564 object.insert(
2565 "last_modified".to_string(),
2566 Value::from(combined.stats.last_realtime_usec),
2567 );
2568 }
2569}
2570
2571fn add_sampling_if_needed(object: &mut Map<String, Value>, combined: &CombinedResult) {
2572 if combined.sampling_enabled {
2573 object.insert(
2574 "_sampling".to_string(),
2575 json!({
2576 "enabled": true,
2577 "sampled": combined.stats.sampling_sampled,
2578 "unsampled": combined.stats.sampling_unsampled,
2579 "estimated": combined.stats.sampling_estimated,
2580 }),
2581 );
2582 }
2583}
2584
2585fn add_analysis_outputs_if_needed(
2586 object: &mut Map<String, Value>,
2587 request: &NetdataRequest,
2588 artifacts: QueryResponseArtifacts,
2589) {
2590 if !request.data_only || request.delta {
2591 let (facets_key, histogram_key, items_key) = response_analysis_keys(request.data_only);
2592 object.insert(facets_key.to_string(), artifacts.facets);
2593 object.insert(
2594 histogram_key.to_string(),
2595 artifacts.histogram.unwrap_or(Value::Null),
2596 );
2597 object.insert(items_key.to_string(), artifacts.items);
2598 }
2599}
2600
2601fn response_analysis_keys(data_only: bool) -> (&'static str, &'static str, &'static str) {
2602 if data_only {
2603 ("facets_delta", "histogram_delta", "items_delta")
2604 } else {
2605 ("facets", "histogram", "items")
2606 }
2607}
2608
2609fn merge_histogram(
2610 target: &mut Option<ExplorerHistogram>,
2611 source: ExplorerHistogram,
2612) -> Result<()> {
2613 let Some(target) = target else {
2614 *target = Some(source);
2615 return Ok(());
2616 };
2617 if target.field != source.field
2618 || target.buckets.len() != source.buckets.len()
2619 || target
2620 .buckets
2621 .iter()
2622 .zip(source.buckets.iter())
2623 .any(|(target_bucket, source_bucket)| {
2624 target_bucket.start_realtime_usec != source_bucket.start_realtime_usec
2625 || target_bucket.end_realtime_usec != source_bucket.end_realtime_usec
2626 })
2627 {
2628 return Err(SdkError::Unsupported(
2629 "inconsistent Netdata histogram bucket shape",
2630 ));
2631 }
2632 for (index, source_bucket) in source.buckets.into_iter().enumerate() {
2633 let Some(target_bucket) = target.buckets.get_mut(index) else {
2634 return Err(SdkError::Unsupported(
2635 "inconsistent Netdata histogram bucket shape",
2636 ));
2637 };
2638 for (value, count) in source_bucket.values {
2639 *target_bucket.values.entry(value).or_default() += count;
2640 }
2641 }
2642 Ok(())
2643}
2644
2645fn facet_group_is_reportable(values: &BTreeMap<Vec<u8>, u64>) -> bool {
2646 values
2647 .iter()
2648 .any(|(value, _count)| !value.is_empty() && value.as_slice() != b"-")
2649}
2650
2651fn netdata_facet_value(value: &[u8]) -> &[u8] {
2652 if value.len() > NETDATA_FACET_MAX_VALUE_LENGTH {
2653 &value[..NETDATA_FACET_MAX_VALUE_LENGTH]
2654 } else {
2655 value
2656 }
2657}
2658
2659fn add_netdata_facet_count(target: &mut BTreeMap<Vec<u8>, u64>, value: &[u8], count: u64) {
2660 *target
2661 .entry(netdata_facet_value(value).to_vec())
2662 .or_default() += count;
2663}
2664
2665fn not_modified_before_scan_response(
2666 request: &NetdataRequest,
2667 selected: &SelectedJournalFiles,
2668) -> Option<Value> {
2669 if request.if_modified_since_usec != 0 && !selected.files_are_newer {
2670 Some(netdata_function_error(
2671 304,
2672 "No new data since the previous call.",
2673 ))
2674 } else {
2675 None
2676 }
2677}
2678
2679fn should_collect_unfiltered_facet_vocabulary(
2680 request: &NetdataRequest,
2681 combined: &CombinedResult,
2682) -> bool {
2683 !request.data_only && !combined.partial && !request.filters.is_empty()
2684}
2685
2686fn progress_context(file_index: usize, total_files: usize, started: Instant) -> ProgressContext {
2687 ProgressContext {
2688 current_file: file_index + 1,
2689 total_files,
2690 started,
2691 }
2692}
2693
2694fn emit_netdata_progress(
2695 options: &mut NetdataFunctionRunOptions<'_>,
2696 progress: NetdataFunctionProgress,
2697) {
2698 if let Some(callback) = options.progress_callback.as_deref_mut() {
2699 callback(progress);
2700 }
2701}
2702
2703fn emit_progress_for_combined(
2704 options: &mut NetdataFunctionRunOptions<'_>,
2705 combined: &CombinedResult,
2706 context: ProgressContext,
2707) {
2708 emit_netdata_progress(
2709 options,
2710 NetdataFunctionProgress {
2711 current_file: context.current_file,
2712 total_files: context.total_files,
2713 matched_files: combined.matched_files,
2714 skipped_files: combined.skipped_files,
2715 stats: combined.stats.clone(),
2716 elapsed: context.started.elapsed(),
2717 },
2718 );
2719}
2720
2721fn emit_explorer_progress(
2722 options: &mut NetdataFunctionRunOptions<'_>,
2723 combined: &CombinedResult,
2724 progress: ExplorerProgress,
2725 context: ProgressContext,
2726) {
2727 let stats = merged_progress_stats(&combined.stats, &progress.stats);
2728 if let Some(callback) = options.progress_callback.as_deref_mut() {
2729 callback(NetdataFunctionProgress {
2730 current_file: context.current_file,
2731 total_files: context.total_files,
2732 matched_files: combined.matched_files,
2733 skipped_files: combined.skipped_files,
2734 stats,
2735 elapsed: context.started.elapsed(),
2736 });
2737 }
2738}
2739
2740fn request_cancelled(options: &NetdataFunctionRunOptions<'_>) -> bool {
2741 options
2742 .cancellation_callback
2743 .is_some_and(|is_cancelled| is_cancelled())
2744}
2745
2746fn should_stop_before_file(
2747 combined: &mut CombinedResult,
2748 deadline: Option<Instant>,
2749 options: &NetdataFunctionRunOptions<'_>,
2750) -> bool {
2751 if request_cancelled(options) {
2752 combined.partial = true;
2753 combined.cancelled = true;
2754 return true;
2755 }
2756 if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
2757 combined.partial = true;
2758 combined.timed_out = true;
2759 return true;
2760 }
2761 false
2762}
2763
2764fn collect_column_fields_for_file(
2765 reader: &mut FileReader,
2766 request: &NetdataRequest,
2767 path: &Path,
2768 combined: &mut CombinedResult,
2769) {
2770 if request.data_only {
2771 return;
2772 }
2773 match reader.enumerate_fields_indexed() {
2774 Ok(fields) => combined.add_column_fields(fields),
2775 Err(err) => combined.file_errors.push(format!(
2776 "{}: FIELD index enumeration failed: {err}",
2777 path.display()
2778 )),
2779 }
2780}
2781
2782fn record_explore_result(
2783 result: Result<(ExplorerResult, Option<ExplorerStopReason>)>,
2784 path: &Path,
2785 combined: &mut CombinedResult,
2786) -> Option<(ExplorerResult, Option<ExplorerStopReason>)> {
2787 match result {
2788 Ok(result) => Some(result),
2789 Err(err) => {
2790 combined.skipped_files = combined.skipped_files.saturating_add(1);
2791 combined
2792 .file_errors
2793 .push(format!("{}: {err}", path.display()));
2794 None
2795 }
2796 }
2797}
2798
2799fn delta_scan_can_stop(
2800 request: &NetdataRequest,
2801 page_window: &RefCell<NetdataPageWindow>,
2802 realtime_usec: u64,
2803 rows_matched: u64,
2804 realtime_delta_usec: u64,
2805) -> bool {
2806 let mut page_window = page_window.borrow_mut();
2807 page_window.observe(realtime_usec);
2808 request.data_only
2809 && request.delta
2810 && rows_matched % DATA_ONLY_CHECK_EVERY_ROWS == 0
2811 && page_window.can_stop_delta_file(realtime_usec, realtime_delta_usec)
2812}
2813
2814#[allow(clippy::too_many_arguments)]
2815fn finish_explored_file(
2816 options: &mut NetdataFunctionRunOptions<'_>,
2817 request: &NetdataRequest,
2818 file: &SelectedJournalFile,
2819 query: &ExplorerQuery,
2820 result: ExplorerResult,
2821 stop_reason: Option<ExplorerStopReason>,
2822 combined: &mut CombinedResult,
2823 files: &[SelectedJournalFile],
2824 file_index: usize,
2825 progress: ProgressContext,
2826) -> Result<bool> {
2827 update_learned_realtime_delta(options, &file.path, &file.order, &result.stats);
2828 combined.merge(&file.path, result, query.direction, request.limit)?;
2829 emit_progress_for_combined(options, combined, progress);
2830 if request_cancelled(options) {
2831 combined.partial = true;
2832 combined.cancelled = true;
2833 return Ok(true);
2834 }
2835 if let Some(reason) = stop_reason {
2836 combined.partial = true;
2837 match reason {
2838 ExplorerStopReason::TimedOut => combined.timed_out = true,
2839 ExplorerStopReason::Cancelled => combined.cancelled = true,
2840 }
2841 return Ok(true);
2842 }
2843 Ok(request.data_only
2844 && !request.delta
2845 && !request.tail
2846 && combined.rows.len() >= request.limit
2847 && remaining_files_cannot_affect_data_page(combined, request, files, file_index + 1))
2848}
2849
2850fn file_metadata(
2851 options: &NetdataFunctionRunOptions<'_>,
2852 path: &Path,
2853) -> Option<NetdataJournalFileMetadata> {
2854 options
2855 .state
2856 .as_deref()
2857 .and_then(|state| state.file_metadata(path))
2858}
2859
2860fn update_learned_realtime_delta(
2861 options: &mut NetdataFunctionRunOptions<'_>,
2862 path: &Path,
2863 order: &JournalFileOrderInfo,
2864 stats: &ExplorerStats,
2865) {
2866 let learned_delta = stats.max_source_realtime_delta_usec;
2867 if learned_delta == 0 || learned_delta <= order.journal_vs_realtime_delta_usec {
2868 return;
2869 }
2870 let learned_delta = normalize_journal_vs_realtime_delta_usec(learned_delta);
2871 if learned_delta <= order.journal_vs_realtime_delta_usec {
2872 return;
2873 }
2874 if let Some(state) = options.state.as_deref_mut() {
2875 state.update_file_journal_vs_realtime_delta_usec(path, learned_delta);
2876 }
2877}
2878
2879fn normalize_journal_vs_realtime_delta_usec(delta_usec: u64) -> u64 {
2880 delta_usec
2881 .max(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC)
2882 .min(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC)
2883}
2884
2885#[cfg(test)]
2886fn file_may_overlap_request(header: crate::FileHeader, request: &NetdataRequest) -> bool {
2887 if header.tail_entry_realtime == 0 {
2888 return true;
2889 }
2890
2891 let first = header
2892 .head_entry_realtime
2893 .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2894 let last = header
2895 .tail_entry_realtime
2896 .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2897
2898 if request
2899 .after_realtime_usec
2900 .is_some_and(|after| last < after)
2901 {
2902 return false;
2903 }
2904 if request
2905 .before_realtime_usec
2906 .is_some_and(|before| first > before)
2907 {
2908 return false;
2909 }
2910
2911 true
2912}
2913
2914#[derive(Debug)]
2915struct SelectedJournalFile {
2916 path: PathBuf,
2917 order: JournalFileOrderInfo,
2918}
2919
2920#[derive(Debug, Default)]
2921struct SelectedJournalFiles {
2922 files: Vec<SelectedJournalFile>,
2923 files_are_newer: bool,
2924}
2925
2926fn select_journal_files_for_request(
2927 paths: Vec<PathBuf>,
2928 request: &NetdataRequest,
2929 reader_options: ReaderOptions,
2930 options: &NetdataFunctionRunOptions<'_>,
2931) -> SelectedJournalFiles {
2932 let mut selected = Vec::new();
2933 for path in paths {
2934 let metadata = file_metadata(options, &path);
2935 if !request.matches_source(&path, metadata.as_ref()) {
2936 continue;
2937 }
2938 let order = journal_file_order_info(&path, reader_options, metadata.as_ref());
2939 if !journal_file_order_may_overlap_request(&order, request) {
2940 continue;
2941 }
2942 selected.push(SelectedJournalFile { path, order });
2943 }
2944 selected.sort_by(|left, right| {
2945 compare_journal_file_order(&left.order, &right.order, request.direction)
2946 .then_with(|| left.path.cmp(&right.path))
2947 });
2948 let files_are_newer = selected
2949 .iter()
2950 .any(|file| file.order.msg_last_realtime_usec > request.if_modified_since_usec);
2951 SelectedJournalFiles {
2952 files: selected,
2953 files_are_newer,
2954 }
2955}
2956
2957fn remaining_files_cannot_affect_data_page(
2958 combined: &CombinedResult,
2959 request: &NetdataRequest,
2960 files: &[SelectedJournalFile],
2961 next_file_index: usize,
2962) -> bool {
2963 let Some(next) = files.get(next_file_index) else {
2964 return true;
2965 };
2966 let slack = next.order.journal_vs_realtime_delta_usec;
2967 match request.direction {
2968 Direction::Backward => {
2969 let Some(oldest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).min()
2970 else {
2971 return false;
2972 };
2973 next.order.msg_last_realtime_usec < oldest_retained.saturating_sub(slack)
2974 }
2975 Direction::Forward => {
2976 let Some(newest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).max()
2977 else {
2978 return false;
2979 };
2980 next.order.msg_first_realtime_usec > newest_retained.saturating_add(slack)
2981 }
2982 }
2983}
2984
2985fn journal_file_order_may_overlap_request(
2986 info: &JournalFileOrderInfo,
2987 request: &NetdataRequest,
2988) -> bool {
2989 if info.msg_last_realtime_usec == 0 {
2990 return true;
2991 }
2992
2993 let first = info
2994 .msg_first_realtime_usec
2995 .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2996 let last = info
2997 .msg_last_realtime_usec
2998 .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2999
3000 if request
3001 .after_realtime_usec
3002 .is_some_and(|after| last < after)
3003 {
3004 return false;
3005 }
3006 if request
3007 .before_realtime_usec
3008 .is_some_and(|before| first > before)
3009 {
3010 return false;
3011 }
3012
3013 true
3014}
3015
3016fn collect_boot_first_realtime(
3017 paths: &[PathBuf],
3018 reader_options: ReaderOptions,
3019 needed_boot_ids: &BTreeSet<Vec<u8>>,
3020) -> BTreeMap<Vec<u8>, u64> {
3021 let mut out = BTreeMap::new();
3022 if needed_boot_ids.is_empty() {
3023 return out;
3024 }
3025 for path in paths {
3026 let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
3027 continue;
3028 };
3029 let Ok(boot_ids) = reader.query_unique("_BOOT_ID") else {
3030 continue;
3031 };
3032 for boot_id in boot_ids {
3033 if !needed_boot_ids.contains(&boot_id) {
3034 continue;
3035 }
3036 let mut match_payload = b"_BOOT_ID=".to_vec();
3037 match_payload.extend_from_slice(&boot_id);
3038 reader.flush_matches();
3039 reader.add_match(&match_payload);
3040 reader.seek_head();
3041 if !reader.next().unwrap_or(false) {
3042 continue;
3043 }
3044 if let Ok(realtime) = reader.get_realtime_usec() {
3045 record_boot_first_realtime(&mut out, boot_id, realtime);
3046 }
3047 }
3048 reader.flush_matches();
3049 }
3050 out
3051}
3052
3053fn response_boot_ids(
3054 column_order: &[String],
3055 rows: &[LocatedRow],
3056 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
3057 histogram: Option<&ExplorerHistogram>,
3058) -> BTreeSet<Vec<u8>> {
3059 let mut boot_ids = BTreeSet::new();
3060 let row_needs_boot_id = column_order.iter().any(|field| field == "_BOOT_ID");
3061 if row_needs_boot_id {
3062 for row in rows {
3063 if let Some(values) = row_fields(row).get("_BOOT_ID") {
3064 boot_ids.extend(values.iter().cloned());
3065 }
3066 }
3067 }
3068 if let Some(values) = facets.get(b"_BOOT_ID".as_slice()) {
3069 boot_ids.extend(
3070 values
3071 .keys()
3072 .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3073 .cloned(),
3074 );
3075 }
3076 if let Some(histogram) = histogram.filter(|histogram| histogram.field == b"_BOOT_ID") {
3077 for bucket in &histogram.buckets {
3078 boot_ids.extend(
3079 bucket
3080 .values
3081 .keys()
3082 .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3083 .cloned(),
3084 );
3085 }
3086 }
3087 boot_ids
3088}
3089
3090fn record_boot_first_realtime(
3091 target: &mut BTreeMap<Vec<u8>, u64>,
3092 boot_id: Vec<u8>,
3093 realtime_usec: u64,
3094) {
3095 let existing = target.entry(boot_id).or_insert(realtime_usec);
3096 if realtime_usec < *existing {
3097 *existing = realtime_usec;
3098 }
3099}
3100
3101fn row_fields(row: &LocatedRow) -> BTreeMap<String, Vec<Vec<u8>>> {
3102 let mut fields = BTreeMap::new();
3103 for payload in &row.row.payloads {
3104 let Some((field, value)) = split_payload(payload) else {
3105 continue;
3106 };
3107 fields
3108 .entry(String::from_utf8_lossy(field).into_owned())
3109 .or_insert_with(Vec::new)
3110 .push(value.to_vec());
3111 }
3112 fields.insert(
3113 "ND_JOURNAL_FILE".to_string(),
3114 vec![row.file_path.display().to_string().into_bytes()],
3115 );
3116 if !fields.contains_key("ND_JOURNAL_PROCESS") {
3117 let process = dynamic_process_name(&fields);
3118 if !process.is_empty() {
3119 fields.insert("ND_JOURNAL_PROCESS".to_string(), vec![process.into_bytes()]);
3120 }
3121 }
3122 fields
3123}
3124
3125fn dynamic_process_name(fields: &BTreeMap<String, Vec<Vec<u8>>>) -> String {
3126 let base = first_value(fields, "CONTAINER_NAME")
3127 .or_else(|| first_value(fields, "SYSLOG_IDENTIFIER"))
3128 .or_else(|| first_value(fields, "_COMM"))
3129 .map(|value| String::from_utf8_lossy(value).into_owned())
3130 .unwrap_or_default();
3131 if base.is_empty() {
3132 return "-".to_string();
3133 }
3134 match first_value(fields, "_PID") {
3135 Some(pid) if !pid.is_empty() => {
3136 let pid = String::from_utf8_lossy(pid);
3137 format!("{base}[{pid}]")
3138 }
3139 Some(_) => base,
3140 None => format!("{base}[-]"),
3141 }
3142}
3143
3144fn first_value<'a>(fields: &'a BTreeMap<String, Vec<Vec<u8>>>, field: &str) -> Option<&'a [u8]> {
3145 fields
3146 .get(field)
3147 .and_then(|values| values.first())
3148 .map(Vec::as_slice)
3149}
3150
3151fn split_payload(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3152 let split = payload.iter().position(|byte| *byte == b'=')?;
3153 Some((&payload[..split], &payload[split + 1..]))
3154}
3155
3156fn make_row_timestamps_unique(rows: &mut [LocatedRow], direction: Direction) {
3157 let mut last_from = 0u64;
3158 let mut last_to = 0u64;
3159 let mut initialized = false;
3160 for row in rows {
3161 let timestamp = row.row.realtime_usec;
3162 if initialized && timestamp >= last_from && timestamp <= last_to {
3163 match direction {
3164 Direction::Backward => {
3165 last_from = last_from.saturating_sub(1);
3166 row.row.realtime_usec = last_from;
3167 }
3168 Direction::Forward => {
3169 last_to = last_to.saturating_add(1);
3170 row.row.realtime_usec = last_to;
3171 }
3172 }
3173 } else {
3174 last_from = timestamp;
3175 last_to = timestamp;
3176 initialized = true;
3177 }
3178 }
3179}
3180
3181fn column_metadata(key: &str, index: usize) -> Value {
3182 let (visible, filter, full_width) = match key {
3183 "timestamp" => (true, "range", false),
3184 "rowOptions" => (false, "none", false),
3185 "_HOSTNAME" => (true, "facet", false),
3186 "ND_JOURNAL_PROCESS" | "MESSAGE" => (true, "none", key == "MESSAGE"),
3187 "ND_JOURNAL_FILE" | "_SOURCE_REALTIME_TIMESTAMP" => (false, "none", false),
3188 _ if systemd_column_is_facet(key) => (false, "facet", false),
3189 _ => (false, "none", false),
3190 };
3191 let column_type = if key == "timestamp" {
3192 "timestamp"
3193 } else if key == "rowOptions" {
3194 "none"
3195 } else {
3196 "string"
3197 };
3198 let visualization = if key == "rowOptions" {
3199 "rowOptions"
3200 } else {
3201 "value"
3202 };
3203 let mut metadata = json!({
3204 "index": index,
3205 "unique_key": key == "timestamp",
3206 "name": if key == "timestamp" { "Timestamp" } else { key },
3207 "visible": visible,
3208 "type": column_type,
3209 "visualization": visualization,
3210 "value_options": {
3211 "transform": if key == "timestamp" { "datetime_usec" } else { "none" },
3212 "decimal_points": 0,
3213 "default_value": if key == "timestamp" || key == "rowOptions" {
3214 Value::Null
3215 } else {
3216 Value::String("-".to_string())
3217 },
3218 },
3219 "sort": "ascending",
3220 "sortable": false,
3221 "sticky": false,
3222 "summary": "count",
3223 "filter": filter,
3224 "full_width": full_width,
3225 "wrap": key != "rowOptions",
3226 "default_expanded_filter": matches!(key, "PRIORITY" | "SYSLOG_FACILITY" | "MESSAGE_ID"),
3227 });
3228 if key == "rowOptions" {
3229 if let Some(object) = metadata.as_object_mut() {
3230 object.insert("dummy".to_string(), Value::Bool(true));
3231 }
3232 }
3233 metadata
3234}
3235
3236fn systemd_column_is_facet(key: &str) -> bool {
3237 if key == "MESSAGE_ID" {
3238 return true;
3239 }
3240 if key.contains("MESSAGE") || key.contains("TIMESTAMP") || key.starts_with("__") {
3241 return false;
3242 }
3243 true
3244}
3245
3246fn sort_facet_options(field: &str, options: &mut [Value]) {
3247 options.sort_by(|left, right| {
3248 let left_id = left.get("id").and_then(Value::as_str).unwrap_or_default();
3249 let right_id = right.get("id").and_then(Value::as_str).unwrap_or_default();
3250 if field == "PRIORITY" {
3251 return parse_priority(left_id).cmp(&parse_priority(right_id));
3252 }
3253 let left_count = left
3254 .get("count")
3255 .and_then(Value::as_u64)
3256 .unwrap_or_default();
3257 let right_count = right
3258 .get("count")
3259 .and_then(Value::as_u64)
3260 .unwrap_or_default();
3261 right_count
3262 .cmp(&left_count)
3263 .then_with(|| left_id.cmp(right_id))
3264 });
3265}
3266
3267fn parse_fts_query_patterns(query: &str) -> (Vec<ExplorerFtsPattern>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
3268 let bytes = query.as_bytes();
3269 let mut index = 0usize;
3270 let mut terms = Vec::new();
3271 let mut positives = Vec::new();
3272 let mut negatives = Vec::new();
3273
3274 while let Some((pattern, negative)) = next_fts_pattern(bytes, &mut index) {
3275 push_fts_pattern(
3276 pattern,
3277 negative,
3278 &mut terms,
3279 &mut positives,
3280 &mut negatives,
3281 );
3282 }
3283
3284 (terms, positives, negatives)
3285}
3286
3287fn next_fts_pattern(bytes: &[u8], index: &mut usize) -> Option<(Vec<u8>, bool)> {
3288 while *index < bytes.len() {
3289 skip_fts_separators(bytes, index);
3290 let negative = consume_fts_negative_marker(bytes, index);
3291 let pattern = read_fts_pattern(bytes, index);
3292 if !pattern.is_empty() {
3293 return Some((pattern, negative));
3294 }
3295 }
3296 None
3297}
3298
3299fn skip_fts_separators(bytes: &[u8], index: &mut usize) {
3300 while *index < bytes.len() && bytes[*index] == b'|' {
3301 *index += 1;
3302 }
3303}
3304
3305fn consume_fts_negative_marker(bytes: &[u8], index: &mut usize) -> bool {
3306 if bytes.get(*index) == Some(&b'!') {
3307 *index += 1;
3308 true
3309 } else {
3310 false
3311 }
3312}
3313
3314fn read_fts_pattern(bytes: &[u8], index: &mut usize) -> Vec<u8> {
3315 let mut pattern = Vec::new();
3316 let mut escaped = false;
3317 while *index < bytes.len() {
3318 let byte = bytes[*index];
3319 *index += 1;
3320 if byte == b'\\' && !escaped {
3321 escaped = true;
3322 continue;
3323 }
3324 if byte == b'|' && !escaped {
3325 break;
3326 }
3327 pattern.push(byte);
3328 escaped = false;
3329 }
3330 pattern
3331}
3332
3333fn push_fts_pattern(
3334 pattern: Vec<u8>,
3335 negative: bool,
3336 terms: &mut Vec<ExplorerFtsPattern>,
3337 positives: &mut Vec<Vec<u8>>,
3338 negatives: &mut Vec<Vec<u8>>,
3339) {
3340 terms.push(ExplorerFtsPattern::substring(pattern.clone(), negative));
3341 if negative {
3342 negatives.push(pattern);
3343 } else {
3344 positives.push(pattern);
3345 }
3346}
3347
3348fn parse_filters(value: Option<&Value>) -> Vec<ExplorerFilter> {
3349 let Some(Value::Object(selections)) = value else {
3350 return Vec::new();
3351 };
3352 let mut filters = Vec::new();
3353 for (field, values) in selections {
3354 if matches!(field.as_str(), "query" | "source" | "__logs_sources") {
3355 continue;
3356 }
3357 let Some(values) = parse_string_array(Some(values)) else {
3358 continue;
3359 };
3360 filters.push(ExplorerFilter::new(
3361 field.as_bytes().to_vec(),
3362 values
3363 .into_iter()
3364 .map(|value| normalize_filter_value(field, &value)),
3365 ));
3366 }
3367 filters
3368}
3369
3370#[derive(Debug, Clone)]
3371struct SourceSelection {
3372 source_type: u64,
3373 exact_sources: Vec<String>,
3374}
3375
3376fn parse_source_selection(value: Option<&Value>) -> SourceSelection {
3377 let mut selection = SourceSelection {
3378 source_type: SOURCE_TYPE_ALL,
3379 exact_sources: Vec::new(),
3380 };
3381 let Some(Value::Object(selections)) = value else {
3382 return selection;
3383 };
3384 let Some(values) = parse_string_array(selections.get("__logs_sources")) else {
3385 return selection;
3386 };
3387 selection.source_type = 0;
3388 for value in values {
3389 match source_type_for_name(&value) {
3390 Some(source_type) => selection.source_type |= source_type,
3391 None => selection.exact_sources.push(value),
3392 }
3393 }
3394 selection
3395}
3396
3397fn source_type_for_name(value: &str) -> Option<u64> {
3398 match value {
3399 "all" => Some(SOURCE_TYPE_ALL),
3400 "all-local-logs" => Some(SOURCE_TYPE_LOCAL_ALL),
3401 "all-remote-systems" => Some(SOURCE_TYPE_REMOTE_ALL),
3402 "all-local-system-logs" => Some(SOURCE_TYPE_LOCAL_SYSTEM),
3403 "all-local-user-logs" => Some(SOURCE_TYPE_LOCAL_USER),
3404 "all-local-namespaces" => Some(SOURCE_TYPE_LOCAL_NAMESPACE),
3405 "all-uncategorized" => Some(SOURCE_TYPE_LOCAL_OTHER),
3406 _ => None,
3407 }
3408}
3409
3410fn journal_file_source_type(path: &Path) -> u64 {
3411 let text = path.to_string_lossy();
3412 let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
3413 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER;
3414 };
3415 if text.contains("/remote/") {
3416 return SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL;
3417 }
3418 if local_namespace_source_name(path).is_some() {
3419 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE;
3420 }
3421 if name.starts_with("system") {
3422 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM;
3423 }
3424 if name.starts_with("user") {
3425 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER;
3426 }
3427 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
3428}
3429
3430fn local_namespace_source_name(path: &Path) -> Option<String> {
3431 let parent = path.parent()?.file_name()?.to_str()?;
3432 let (_, namespace) = parent.rsplit_once('.')?;
3433 (!namespace.is_empty()).then(|| format!("namespace-{namespace}"))
3434}
3435
3436fn journal_file_exact_source_name(path: &Path) -> Option<String> {
3437 let text = path.to_string_lossy();
3438 if text.contains("/remote/") {
3439 let name = path.file_name()?.to_str()?;
3440 let source = name
3441 .split_once('@')
3442 .map(|(prefix, _)| prefix)
3443 .unwrap_or_else(|| {
3444 name.strip_suffix(".journal~.zst")
3445 .or_else(|| name.strip_suffix(".journal.zst"))
3446 .or_else(|| name.strip_suffix(".journal~"))
3447 .or_else(|| name.strip_suffix(".journal"))
3448 .unwrap_or(name)
3449 });
3450 return source.starts_with("remote-").then(|| source.to_string());
3451 }
3452 local_namespace_source_name(path)
3453}
3454
3455fn normalize_filter_value(field: &str, value: &str) -> Vec<u8> {
3456 if field == "PRIORITY" {
3457 if let Some(priority) = priority_name_to_number(value) {
3458 return priority.as_bytes().to_vec();
3459 }
3460 }
3461 value.as_bytes().to_vec()
3462}
3463
3464fn parse_string_array(value: Option<&Value>) -> Option<Vec<String>> {
3465 let Value::Array(items) = value? else {
3466 return None;
3467 };
3468 Some(
3469 items
3470 .iter()
3471 .filter_map(Value::as_str)
3472 .map(ToOwned::to_owned)
3473 .collect(),
3474 )
3475}
3476
3477fn request_direction(object: &Map<String, Value>) -> Direction {
3478 match get_str(object, "direction").unwrap_or("backward") {
3479 "forward" | "forwards" | "next" => Direction::Forward,
3480 _ => Direction::Backward,
3481 }
3482}
3483
3484fn request_delta(data_only: bool, object: &Map<String, Value>) -> bool {
3485 data_only && get_bool(object, "delta").unwrap_or(false)
3486}
3487
3488fn request_tail(data_only: bool, if_modified_since_usec: u64, object: &Map<String, Value>) -> bool {
3489 data_only && if_modified_since_usec != 0 && get_bool(object, "tail").unwrap_or(false)
3490}
3491
3492fn tail_after_realtime_bound(
3493 after_realtime_usec: Option<u64>,
3494 anchor: ExplorerAnchor,
3495) -> Option<u64> {
3496 let ExplorerAnchor::Realtime(anchor) = anchor else {
3497 return after_realtime_usec;
3498 };
3499 let tail_after = anchor.saturating_add(1);
3500 Some(
3501 after_realtime_usec
3502 .map(|after| after.max(tail_after))
3503 .unwrap_or(tail_after),
3504 )
3505}
3506
3507fn before_realtime_bound_excluding_anchor(
3508 before_realtime_usec: Option<u64>,
3509 anchor: ExplorerAnchor,
3510) -> Option<u64> {
3511 let ExplorerAnchor::Realtime(anchor) = anchor else {
3512 return before_realtime_usec;
3513 };
3514 let before_anchor = anchor.saturating_sub(1);
3515 Some(
3516 before_realtime_usec
3517 .map(|before| before.min(before_anchor))
3518 .unwrap_or(before_anchor),
3519 )
3520}
3521
3522fn request_anchor_and_direction(
3523 object: &Map<String, Value>,
3524 tail: bool,
3525 direction: Direction,
3526 after_realtime_usec: Option<u64>,
3527 before_realtime_usec: Option<u64>,
3528) -> (ExplorerAnchor, Direction) {
3529 let anchor = get_u64(object, "anchor")
3530 .filter(|value| *value != 0)
3531 .map(normalize_timestamp_to_usec)
3532 .map(ExplorerAnchor::Realtime)
3533 .unwrap_or(ExplorerAnchor::Auto);
3534 if tail && matches!(anchor, ExplorerAnchor::Realtime(_)) {
3535 return (anchor, Direction::Backward);
3536 }
3537 if anchor_outside_window(anchor, after_realtime_usec, before_realtime_usec) {
3538 (ExplorerAnchor::Auto, Direction::Backward)
3539 } else {
3540 (anchor, direction)
3541 }
3542}
3543
3544fn anchor_outside_window(
3545 anchor: ExplorerAnchor,
3546 after_realtime_usec: Option<u64>,
3547 before_realtime_usec: Option<u64>,
3548) -> bool {
3549 let ExplorerAnchor::Realtime(anchor_usec) = anchor else {
3550 return false;
3551 };
3552 after_realtime_usec.is_some_and(|after| anchor_usec < after)
3553 || before_realtime_usec.is_some_and(|before| anchor_usec > before)
3554}
3555
3556fn request_limit(object: &Map<String, Value>) -> usize {
3557 get_u64(object, "last")
3558 .filter(|value| *value != 0)
3559 .map(|value| value as usize)
3560 .unwrap_or(DEFAULT_ITEMS_TO_RETURN)
3561}
3562
3563fn request_facets(
3564 requested_facets: &Option<Vec<String>>,
3565 config: &NetdataFunctionConfig,
3566) -> Vec<Vec<u8>> {
3567 requested_facets
3568 .clone()
3569 .unwrap_or_else(|| config.default_facets.clone())
3570 .into_iter()
3571 .map(Vec::from)
3572 .collect()
3573}
3574
3575fn request_histogram(object: &Map<String, Value>) -> Option<String> {
3576 get_str(object, "histogram")
3577 .filter(|histogram| !histogram.is_empty())
3578 .map(ToOwned::to_owned)
3579}
3580
3581fn request_histogram_or_default(
3582 requested_histogram: &Option<String>,
3583 config: &NetdataFunctionConfig,
3584) -> Option<String> {
3585 requested_histogram
3586 .clone()
3587 .or_else(|| config.default_histogram.clone())
3588}
3589
3590fn request_query(object: &Map<String, Value>) -> Option<String> {
3591 get_str(object, "query")
3592 .filter(|query| !query.is_empty())
3593 .map(ToOwned::to_owned)
3594}
3595
3596fn get_bool(object: &Map<String, Value>, key: &str) -> Option<bool> {
3597 object.get(key).and_then(Value::as_bool)
3598}
3599
3600fn get_i64(object: &Map<String, Value>, key: &str) -> Option<i64> {
3601 object.get(key).and_then(Value::as_i64)
3602}
3603
3604fn get_u64(object: &Map<String, Value>, key: &str) -> Option<u64> {
3605 object.get(key).and_then(Value::as_u64)
3606}
3607
3608fn get_str<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
3609 object.get(key).and_then(Value::as_str)
3610}
3611
3612fn normalize_time_window(
3613 now_seconds: i64,
3614 after: Option<i64>,
3615 before: Option<i64>,
3616) -> (Option<u64>, Option<u64>) {
3617 let mut after = after.unwrap_or(0);
3618 let mut before = before.unwrap_or(0);
3619
3620 if after == 0 && before == 0 {
3621 before = now_seconds;
3622 after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3623 } else {
3624 (after, before) = relative_window_to_absolute(now_seconds, after, before);
3625 }
3626
3627 if after > before {
3628 std::mem::swap(&mut after, &mut before);
3629 }
3630 if after == before {
3631 after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3632 }
3633
3634 (
3635 Some(normalize_timestamp_to_usec_with_rounding(
3636 after.max(0) as u64,
3637 false,
3638 )),
3639 Some(normalize_timestamp_to_usec_with_rounding(
3640 before.max(0) as u64,
3641 true,
3642 )),
3643 )
3644}
3645
3646fn relative_window_to_absolute(now_seconds: i64, after: i64, before: i64) -> (i64, i64) {
3647 let mut after = after;
3648 let mut before = before;
3649
3650 if before.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3651 if before > 0 {
3652 before = -before;
3653 }
3654 before = now_seconds.saturating_add(before);
3655 }
3656
3657 if after.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3658 if after > 0 {
3659 after = -after;
3660 }
3661 if after == 0 {
3662 after = -NETDATA_MISSING_AFTER_RELATIVE_SECONDS;
3663 }
3664 after = before.saturating_add(after).saturating_add(1);
3665 }
3666
3667 if after > before {
3668 std::mem::swap(&mut after, &mut before);
3669 }
3670
3671 if before > now_seconds {
3672 let delta = before.saturating_sub(now_seconds);
3673 before = before.saturating_sub(delta);
3674 after = after.saturating_sub(delta);
3675 }
3676
3677 (after, before)
3678}
3679
3680struct RequestEchoInput<'a> {
3681 info: bool,
3682 after_realtime_usec: Option<u64>,
3683 before_realtime_usec: Option<u64>,
3684 if_modified_since_usec: u64,
3685 anchor: ExplorerAnchor,
3686 direction: Direction,
3687 limit: usize,
3688 data_only: bool,
3689 delta: bool,
3690 tail: bool,
3691 sampling: u64,
3692 source_type: u64,
3693 requested_facets: Option<&'a [String]>,
3694 selections: Option<&'a Value>,
3695 histogram: Option<&'a str>,
3696 query: Option<&'a str>,
3697}
3698
3699fn normalized_request_echo(input: &RequestEchoInput<'_>) -> Value {
3700 let anchor_usec = match input.anchor {
3701 ExplorerAnchor::Realtime(usec) => usec,
3702 ExplorerAnchor::Auto | ExplorerAnchor::Head | ExplorerAnchor::Tail => 0,
3703 };
3704 let mut out = json!({
3705 "info": input.info,
3706 "slice": true,
3710 "data_only": input.data_only,
3711 "delta": input.delta,
3712 "tail": input.tail,
3713 "sampling": input.sampling,
3714 "source_type": input.source_type,
3715 "after": input.after_realtime_usec.unwrap_or(0) / 1_000_000,
3716 "before": input.before_realtime_usec.unwrap_or(0) / 1_000_000,
3717 "if_modified_since": input.if_modified_since_usec,
3718 "anchor": anchor_usec,
3719 "direction": match input.direction {
3720 Direction::Forward => "forward",
3721 Direction::Backward => "backward",
3722 },
3723 "last": input.limit,
3724 "query": input.query,
3725 "histogram": input.histogram,
3726 });
3727 if let Some(facets) = input.requested_facets {
3728 if let Some(object) = out.as_object_mut() {
3729 object.insert(
3730 "facets".to_string(),
3731 facets
3732 .iter()
3733 .map(|field| Value::String(field.clone()))
3734 .collect(),
3735 );
3736 }
3737 }
3738 if let Some(Value::Object(selections)) = input.selections {
3739 let mut selections = selections.clone();
3740 if let Some(Value::Array(sources)) = selections.get_mut("__logs_sources") {
3741 for source in sources {
3742 *source = Value::Null;
3743 }
3744 }
3745 if let Some(object) = out.as_object_mut() {
3746 object.insert("selections".to_string(), Value::Object(selections));
3747 }
3748 }
3749 out
3750}
3751
3752fn normalize_timestamp_to_usec(value: u64) -> u64 {
3753 normalize_timestamp_to_usec_with_rounding(value, false)
3754}
3755
3756fn normalize_timestamp_to_usec_with_rounding(value: u64, end_of_second: bool) -> u64 {
3757 if value >= 1_000_000_000_000 {
3758 value
3759 } else if end_of_second {
3760 value.saturating_mul(1_000_000).saturating_add(999_999)
3761 } else {
3762 value.saturating_mul(1_000_000)
3763 }
3764}
3765
3766fn unix_now_seconds() -> i64 {
3767 SystemTime::now()
3768 .duration_since(UNIX_EPOCH)
3769 .map(|duration| duration.as_secs() as i64)
3770 .unwrap_or_default()
3771}
3772
3773fn human_binary_size(bytes: u64) -> String {
3774 const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
3775 let mut value = bytes as f64;
3776 let mut unit = 0usize;
3777 while value >= 1024.0 && unit + 1 < UNITS.len() {
3778 value /= 1024.0;
3779 unit += 1;
3780 }
3781 if unit == 0 {
3782 format!("{}{}", bytes, UNITS[unit])
3783 } else if value.fract() == 0.0 {
3784 format!("{value:.0}{}", UNITS[unit])
3785 } else {
3786 let mut formatted = format!("{value:.2}");
3787 while formatted.contains('.') && formatted.ends_with('0') {
3788 formatted.pop();
3789 }
3790 if formatted.ends_with('.') {
3791 formatted.pop();
3792 }
3793 format!("{formatted}{}", UNITS[unit])
3794 }
3795}
3796
3797fn human_duration_seconds(seconds: u64) -> String {
3798 let years = seconds / (365 * 86_400);
3799 let seconds = seconds % (365 * 86_400);
3800 let months = seconds / (30 * 86_400);
3801 let seconds = seconds % (30 * 86_400);
3802 let days = seconds / 86_400;
3803 let seconds = seconds % 86_400;
3804 let hours = seconds / 3600;
3805 let minutes = (seconds % 3600) / 60;
3806 let seconds = seconds % 60;
3807 let mut parts = Vec::new();
3808 if years != 0 {
3809 parts.push(format!("{years}y"));
3810 }
3811 if months != 0 {
3812 parts.push(format!("{months}mo"));
3813 }
3814 if days != 0 {
3815 parts.push(format!("{days}d"));
3816 }
3817 if hours != 0 {
3818 parts.push(format!("{hours}h"));
3819 }
3820 if minutes != 0 {
3821 parts.push(format!("{minutes}m"));
3822 }
3823 if seconds != 0 || parts.is_empty() {
3824 parts.push(format!("{seconds}s"));
3825 }
3826 parts.join(" ")
3827}
3828
3829#[derive(Debug, Default)]
3830struct JournalFileCollection {
3831 files: Vec<PathBuf>,
3832 skipped: u64,
3833 errors: Vec<String>,
3834}
3835
3836fn collect_journal_files(path: &Path) -> Result<JournalFileCollection> {
3837 if !path.is_dir() {
3838 return Err(SdkError::InvalidPath(format!(
3839 "not a directory: {}",
3840 path.display()
3841 )));
3842 }
3843 let mut collection = JournalFileCollection::default();
3844 let mut pending = VecDeque::from([(path.to_path_buf(), 0usize)]);
3845 let mut visited = HashSet::new();
3846 while let Some((directory, depth)) = pending.pop_front() {
3847 let visited_key = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
3848 if visited.contains(&visited_key) {
3849 continue;
3850 }
3851 if visited.len() >= NETDATA_MAX_DIRECTORY_SCAN_COUNT {
3852 collection.skipped = collection.skipped.saturating_add(1);
3853 collection.errors.push(format!(
3854 "{}: directory scan limit reached",
3855 directory.display()
3856 ));
3857 continue;
3858 }
3859 visited.insert(visited_key);
3860 let entries = match std::fs::read_dir(&directory) {
3861 Ok(entries) => entries,
3862 Err(err) if directory == path => return Err(err.into()),
3863 Err(err) => {
3864 collection.skipped = collection.skipped.saturating_add(1);
3865 collection
3866 .errors
3867 .push(format!("{}: {err}", directory.display()));
3868 continue;
3869 }
3870 };
3871 for entry in entries.flatten() {
3872 let entry_path = entry.path();
3873 if entry_path.is_file() && is_journal_file_name(&entry_path) {
3874 collection.files.push(entry_path);
3875 } else if depth < NETDATA_MAX_DIRECTORY_SCAN_DEPTH && entry_path.is_dir() {
3876 pending.push_back((entry_path, depth + 1));
3877 }
3878 }
3879 }
3880 collection.files.sort();
3881 dedup_journal_files_by_canonical_path(&mut collection.files);
3882 Ok(collection)
3883}
3884
3885fn dedup_journal_files_by_canonical_path(files: &mut Vec<PathBuf>) {
3886 let mut seen = HashSet::new();
3887 files.retain(|path| {
3888 let key = std::fs::canonicalize(path).unwrap_or_else(|_| path.clone());
3889 seen.insert(key)
3890 });
3891}
3892
3893#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3894struct JournalFileOrderInfo {
3895 msg_first_realtime_usec: u64,
3896 msg_last_realtime_usec: u64,
3897 file_last_modified_usec: u64,
3898 journal_vs_realtime_delta_usec: u64,
3899}
3900
3901fn journal_file_order_info(
3902 path: &Path,
3903 reader_options: ReaderOptions,
3904 metadata: Option<&NetdataJournalFileMetadata>,
3905) -> JournalFileOrderInfo {
3906 let file_last_modified_usec = std::fs::metadata(path)
3907 .ok()
3908 .and_then(|metadata| metadata.modified().ok())
3909 .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
3910 .map(|duration| duration.as_micros().min(u128::from(u64::MAX)) as u64)
3911 .unwrap_or_default();
3912 let file_last_modified_usec = metadata
3913 .and_then(|metadata| metadata.file_last_modified_usec)
3914 .unwrap_or(file_last_modified_usec);
3915 let journal_vs_realtime_delta_usec = metadata
3916 .and_then(|metadata| metadata.journal_vs_realtime_delta_usec)
3917 .map(normalize_journal_vs_realtime_delta_usec)
3918 .unwrap_or(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
3919
3920 let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
3921 return JournalFileOrderInfo {
3922 msg_first_realtime_usec: 0,
3923 msg_last_realtime_usec: file_last_modified_usec,
3924 file_last_modified_usec,
3925 journal_vs_realtime_delta_usec,
3926 };
3927 };
3928 let header = reader.header();
3929 let msg_first_realtime_usec = metadata
3930 .and_then(|metadata| metadata.msg_first_realtime_usec)
3931 .unwrap_or(header.head_entry_realtime);
3932 let msg_last_realtime_usec = metadata
3933 .and_then(|metadata| metadata.msg_last_realtime_usec)
3934 .unwrap_or_else(|| {
3935 if header.tail_entry_realtime == 0 {
3936 file_last_modified_usec
3937 } else {
3938 header.tail_entry_realtime
3939 }
3940 });
3941 JournalFileOrderInfo {
3942 msg_first_realtime_usec,
3943 msg_last_realtime_usec,
3944 file_last_modified_usec,
3945 journal_vs_realtime_delta_usec,
3946 }
3947}
3948
3949fn compare_journal_file_order(
3950 left: &JournalFileOrderInfo,
3951 right: &JournalFileOrderInfo,
3952 direction: Direction,
3953) -> Ordering {
3954 let backward = right
3955 .msg_last_realtime_usec
3956 .cmp(&left.msg_last_realtime_usec)
3957 .then_with(|| {
3958 right
3959 .file_last_modified_usec
3960 .cmp(&left.file_last_modified_usec)
3961 })
3962 .then_with(|| {
3963 right
3964 .msg_first_realtime_usec
3965 .cmp(&left.msg_first_realtime_usec)
3966 });
3967 match direction {
3968 Direction::Backward => backward,
3969 Direction::Forward => backward.reverse(),
3970 }
3971}
3972
3973fn is_journal_file_name(path: &Path) -> bool {
3974 path.file_name()
3975 .and_then(|name| name.to_str())
3976 .is_some_and(|name| {
3977 name.ends_with(".journal")
3978 || name.ends_with(".journal~")
3979 || name.ends_with(".journal.zst")
3980 || name.ends_with(".journal~.zst")
3981 })
3982}
3983
3984fn push_unique_many(target: &mut Vec<String>, values: &[String]) {
3985 for value in values {
3986 push_unique(target, value);
3987 }
3988}
3989
3990fn string_fields(fields: &[Vec<u8>]) -> Vec<String> {
3991 fields
3992 .iter()
3993 .filter_map(|field| String::from_utf8(field.clone()).ok())
3994 .collect()
3995}
3996
3997fn push_unique(target: &mut Vec<String>, value: impl AsRef<str>) {
3998 let value = value.as_ref();
3999 if !target.iter().any(|existing| existing == value) {
4000 target.push(value.to_string());
4001 }
4002}
4003
4004fn netdata_reorder_key(value: &str) -> String {
4005 value
4006 .trim_start_matches(|character: char| character.is_ascii_punctuation())
4007 .to_ascii_lowercase()
4008}
4009
4010fn histogram_update_every_seconds(histogram: &ExplorerHistogram) -> u64 {
4011 histogram
4012 .buckets
4013 .first()
4014 .map(|bucket| {
4015 bucket
4016 .end_realtime_usec
4017 .saturating_sub(bucket.start_realtime_usec)
4018 .checked_div(1_000_000)
4019 .unwrap_or(1)
4020 .max(1)
4021 })
4022 .unwrap_or(1)
4023}
4024
4025enum TimestampPrecision {
4026 Seconds,
4027 Micros,
4028}
4029
4030fn format_realtime_usec(timestamp: u64, precision: TimestampPrecision) -> String {
4031 let seconds = (timestamp / 1_000_000) as i64;
4032 let micros = (timestamp % 1_000_000) as u32;
4033 DateTime::<Utc>::from_timestamp(seconds, micros.saturating_mul(1000))
4034 .map(|datetime| match precision {
4035 TimestampPrecision::Seconds => datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
4036 TimestampPrecision::Micros => datetime.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string(),
4037 })
4038 .unwrap_or_else(|| timestamp.to_string())
4039}
4040
4041fn priority_name(raw: &str) -> Option<&'static str> {
4042 match parse_priority(raw)? {
4043 0 => Some("panic"),
4044 1 => Some("alert"),
4045 2 => Some("critical"),
4046 3 => Some("error"),
4047 4 => Some("warning"),
4048 5 => Some("notice"),
4049 6 => Some("info"),
4050 7 => Some("debug"),
4051 _ => None,
4052 }
4053}
4054
4055fn priority_name_to_number(value: &str) -> Option<&'static str> {
4056 match value {
4057 "panic" | "emergency" | "emerg" => Some("0"),
4058 "alert" => Some("1"),
4059 "critical" | "crit" => Some("2"),
4060 "error" | "err" => Some("3"),
4061 "warning" | "warn" => Some("4"),
4062 "notice" => Some("5"),
4063 "info" => Some("6"),
4064 "debug" => Some("7"),
4065 _ => None,
4066 }
4067}
4068
4069fn parse_priority(raw: &str) -> Option<u8> {
4070 raw.parse::<u8>().ok()
4071}
4072
4073fn priority_to_row_severity(raw: &[u8]) -> &'static str {
4074 let raw = String::from_utf8_lossy(raw);
4075 match parse_priority(&raw) {
4076 Some(priority) if priority <= 3 => "critical",
4077 Some(4) => "warning",
4078 Some(5) => "notice",
4079 Some(priority) if priority >= 7 => "debug",
4080 _ => "normal",
4081 }
4082}
4083
4084fn syslog_facility_name(raw: &str) -> Option<&'static str> {
4085 match raw.parse::<u8>().ok()? {
4086 0 => Some("kern"),
4087 1 => Some("user"),
4088 2 => Some("mail"),
4089 3 => Some("daemon"),
4090 4 => Some("auth"),
4091 5 => Some("syslog"),
4092 6 => Some("lpr"),
4093 7 => Some("news"),
4094 8 => Some("uucp"),
4095 9 => Some("cron"),
4096 10 => Some("authpriv"),
4097 11 => Some("ftp"),
4098 16 => Some("local0"),
4099 17 => Some("local1"),
4100 18 => Some("local2"),
4101 19 => Some("local3"),
4102 20 => Some("local4"),
4103 21 => Some("local5"),
4104 22 => Some("local6"),
4105 23 => Some("local7"),
4106 _ => None,
4107 }
4108}
4109
4110const ERRNO_NAMES: &[(u32, &str)] = &[
4111 (1, "EPERM"),
4112 (2, "ENOENT"),
4113 (3, "ESRCH"),
4114 (4, "EINTR"),
4115 (5, "EIO"),
4116 (6, "ENXIO"),
4117 (7, "E2BIG"),
4118 (8, "ENOEXEC"),
4119 (9, "EBADF"),
4120 (10, "ECHILD"),
4121 (11, "EAGAIN"),
4122 (12, "ENOMEM"),
4123 (13, "EACCES"),
4124 (14, "EFAULT"),
4125 (15, "ENOTBLK"),
4126 (16, "EBUSY"),
4127 (17, "EEXIST"),
4128 (18, "EXDEV"),
4129 (19, "ENODEV"),
4130 (20, "ENOTDIR"),
4131 (21, "EISDIR"),
4132 (22, "EINVAL"),
4133 (23, "ENFILE"),
4134 (24, "EMFILE"),
4135 (25, "ENOTTY"),
4136 (26, "ETXTBSY"),
4137 (27, "EFBIG"),
4138 (28, "ENOSPC"),
4139 (29, "ESPIPE"),
4140 (30, "EROFS"),
4141 (31, "EMLINK"),
4142 (32, "EPIPE"),
4143 (33, "EDOM"),
4144 (34, "ERANGE"),
4145 (35, "EDEADLK"),
4146 (36, "ENAMETOOLONG"),
4147 (37, "ENOLCK"),
4148 (38, "ENOSYS"),
4149 (39, "ENOTEMPTY"),
4150 (40, "ELOOP"),
4151 (42, "ENOMSG"),
4152 (43, "EIDRM"),
4153 (44, "ECHRNG"),
4154 (45, "EL2NSYNC"),
4155 (46, "EL3HLT"),
4156 (47, "EL3RST"),
4157 (48, "ELNRNG"),
4158 (49, "EUNATCH"),
4159 (50, "ENOCSI"),
4160 (51, "EL2HLT"),
4161 (52, "EBADE"),
4162 (53, "EBADR"),
4163 (54, "EXFULL"),
4164 (55, "ENOANO"),
4165 (56, "EBADRQC"),
4166 (57, "EBADSLT"),
4167 (59, "EBFONT"),
4168 (60, "ENOSTR"),
4169 (61, "ENODATA"),
4170 (62, "ETIME"),
4171 (63, "ENOSR"),
4172 (64, "ENONET"),
4173 (65, "ENOPKG"),
4174 (66, "EREMOTE"),
4175 (67, "ENOLINK"),
4176 (68, "EADV"),
4177 (69, "ESRMNT"),
4178 (70, "ECOMM"),
4179 (71, "EPROTO"),
4180 (72, "EMULTIHOP"),
4181 (73, "EDOTDOT"),
4182 (74, "EBADMSG"),
4183 (75, "EOVERFLOW"),
4184 (76, "ENOTUNIQ"),
4185 (77, "EBADFD"),
4186 (78, "EREMCHG"),
4187 (79, "ELIBACC"),
4188 (80, "ELIBBAD"),
4189 (81, "ELIBSCN"),
4190 (82, "ELIBMAX"),
4191 (83, "ELIBEXEC"),
4192 (84, "EILSEQ"),
4193 (85, "ERESTART"),
4194 (86, "ESTRPIPE"),
4195 (87, "EUSERS"),
4196 (88, "ENOTSOCK"),
4197 (89, "EDESTADDRREQ"),
4198 (90, "EMSGSIZE"),
4199 (91, "EPROTOTYPE"),
4200 (92, "ENOPROTOOPT"),
4201 (93, "EPROTONOSUPPORT"),
4202 (94, "ESOCKTNOSUPPORT"),
4203 (95, "ENOTSUP"),
4204 (96, "EPFNOSUPPORT"),
4205 (97, "EAFNOSUPPORT"),
4206 (98, "EADDRINUSE"),
4207 (99, "EADDRNOTAVAIL"),
4208 (100, "ENETDOWN"),
4209 (101, "ENETUNREACH"),
4210 (102, "ENETRESET"),
4211 (103, "ECONNABORTED"),
4212 (104, "ECONNRESET"),
4213 (105, "ENOBUFS"),
4214 (106, "EISCONN"),
4215 (107, "ENOTCONN"),
4216 (108, "ESHUTDOWN"),
4217 (109, "ETOOMANYREFS"),
4218 (110, "ETIMEDOUT"),
4219 (111, "ECONNREFUSED"),
4220 (112, "EHOSTDOWN"),
4221 (113, "EHOSTUNREACH"),
4222 (114, "EALREADY"),
4223 (115, "EINPROGRESS"),
4224 (116, "ESTALE"),
4225 (117, "EUCLEAN"),
4226 (118, "ENOTNAM"),
4227 (119, "ENAVAIL"),
4228 (120, "EISNAM"),
4229 (121, "EREMOTEIO"),
4230 (122, "EDQUOT"),
4231 (123, "ENOMEDIUM"),
4232 (124, "EMEDIUMTYPE"),
4233 (125, "ECANCELED"),
4234 (126, "ENOKEY"),
4235 (127, "EKEYEXPIRED"),
4236 (128, "EKEYREVOKED"),
4237 (129, "EKEYREJECTED"),
4238 (130, "EOWNERDEAD"),
4239 (131, "ENOTRECOVERABLE"),
4240 (132, "ERFKILL"),
4241 (133, "EHWPOISON"),
4242];
4243
4244fn errno_name(raw: &str) -> Option<String> {
4245 let errno = raw.parse::<u32>().ok()?;
4246 let name = ERRNO_NAMES
4247 .iter()
4248 .find_map(|(candidate, name)| (*candidate == errno).then_some(*name))?;
4249 Some(format!("{errno} ({name})"))
4250}
4251
4252fn cap_effective_display(raw: &str) -> String {
4253 if !raw.bytes().next().is_some_and(|byte| byte.is_ascii_digit()) {
4254 return raw.to_string();
4255 }
4256 let Ok(value) = u64::from_str_radix(raw, 16) else {
4257 return raw.to_string();
4258 };
4259 if value == 0 {
4260 return raw.to_string();
4261 }
4262 const CAPABILITIES: &[&str] = &[
4263 "CHOWN",
4264 "DAC_OVERRIDE",
4265 "DAC_READ_SEARCH",
4266 "FOWNER",
4267 "FSETID",
4268 "KILL",
4269 "SETGID",
4270 "SETUID",
4271 "SETPCAP",
4272 "LINUX_IMMUTABLE",
4273 "NET_BIND_SERVICE",
4274 "NET_BROADCAST",
4275 "NET_ADMIN",
4276 "NET_RAW",
4277 "IPC_LOCK",
4278 "IPC_OWNER",
4279 "SYS_MODULE",
4280 "SYS_RAWIO",
4281 "SYS_CHROOT",
4282 "SYS_PTRACE",
4283 "SYS_PACCT",
4284 "SYS_ADMIN",
4285 "SYS_BOOT",
4286 "SYS_NICE",
4287 "SYS_RESOURCE",
4288 "SYS_TIME",
4289 "SYS_TTY_CONFIG",
4290 "MKNOD",
4291 "LEASE",
4292 "AUDIT_WRITE",
4293 "AUDIT_CONTROL",
4294 "SETFCAP",
4295 "MAC_OVERRIDE",
4296 "MAC_ADMIN",
4297 "SYSLOG",
4298 "WAKE_ALARM",
4299 "BLOCK_SUSPEND",
4300 "AUDIT_READ",
4301 "PERFMON",
4302 "BPF",
4303 "CHECKPOINT_RESTORE",
4304 ];
4305 let names: Vec<&str> = CAPABILITIES
4306 .iter()
4307 .enumerate()
4308 .filter_map(|(index, name)| ((value & (1u64 << index)) != 0).then_some(*name))
4309 .collect();
4310 if names.is_empty() {
4311 raw.to_string()
4312 } else {
4313 format!("{raw} ({})", names.join(" | "))
4314 }
4315}
4316
4317fn systemd_field_display_value(
4318 context: &DisplayContext,
4319 scope: DisplayScope,
4320 field: &str,
4321 value: &[u8],
4322 resolve_user_group_names: bool,
4323) -> Value {
4324 let raw = String::from_utf8_lossy(value);
4325 match field {
4326 "PRIORITY" => Value::String(priority_name(&raw).unwrap_or(&raw).to_string()),
4327 "SYSLOG_FACILITY" => Value::String(syslog_facility_name(&raw).unwrap_or(&raw).to_string()),
4328 "ERRNO" => Value::String(errno_name(&raw).unwrap_or_else(|| raw.to_string())),
4329 "MESSAGE_ID" => Value::String(match (message_id_name(&raw), scope) {
4330 (Some(name), DisplayScope::Data) => format!("{raw} ({name})"),
4331 (Some(name), DisplayScope::Facet | DisplayScope::Histogram) => name.to_string(),
4332 (None, _) => raw.into_owned(),
4333 }),
4334 "_BOOT_ID" => Value::String(match (context.boot_first_realtime.get(value), scope) {
4335 (Some(timestamp), DisplayScope::Data) => format!(
4336 "{} ({}) ",
4337 raw,
4338 format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4339 ),
4340 (Some(timestamp), DisplayScope::Facet | DisplayScope::Histogram) => {
4341 format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4342 }
4343 (None, _) => raw.into_owned(),
4344 }),
4345 "_UID"
4346 | "_SYSTEMD_OWNER_UID"
4347 | "OBJECT_SYSTEMD_OWNER_UID"
4348 | "OBJECT_UID"
4349 | "_AUDIT_LOGINUID"
4350 | "OBJECT_AUDIT_LOGINUID" => {
4351 if resolve_user_group_names {
4352 Value::String(cached_uid_display(context, raw.as_ref()))
4353 } else {
4354 Value::String(raw.into_owned())
4355 }
4356 }
4357 "_GID" | "OBJECT_GID" => {
4358 if resolve_user_group_names {
4359 Value::String(cached_gid_display(context, raw.as_ref()))
4360 } else {
4361 Value::String(raw.into_owned())
4362 }
4363 }
4364 "_CAP_EFFECTIVE" => Value::String(cap_effective_display(&raw)),
4365 "_SOURCE_REALTIME_TIMESTAMP" => Value::String(match raw.parse::<u64>() {
4366 Ok(timestamp) if timestamp != 0 => {
4367 format!(
4368 "{} ({})",
4369 raw,
4370 format_realtime_usec(timestamp, TimestampPrecision::Micros)
4371 )
4372 }
4373 _ => raw.into_owned(),
4374 }),
4375 _ => Value::String(raw.into_owned()),
4376 }
4377}
4378
4379fn cached_uid_display(context: &DisplayContext, raw: &str) -> String {
4380 if let Some(value) = context.uid_display_cache.borrow().get(raw) {
4381 return value.clone();
4382 }
4383 let value = resolve_uid_name(raw).unwrap_or_else(|| raw.to_string());
4384 context
4385 .uid_display_cache
4386 .borrow_mut()
4387 .insert(raw.to_string(), value.clone());
4388 value
4389}
4390
4391fn cached_gid_display(context: &DisplayContext, raw: &str) -> String {
4392 if let Some(value) = context.gid_display_cache.borrow().get(raw) {
4393 return value.clone();
4394 }
4395 let value = resolve_gid_name(raw).unwrap_or_else(|| raw.to_string());
4396 context
4397 .gid_display_cache
4398 .borrow_mut()
4399 .insert(raw.to_string(), value.clone());
4400 value
4401}
4402
4403#[cfg(unix)]
4404fn resolve_uid_name(raw: &str) -> Option<String> {
4405 let uid = raw.parse::<libc::uid_t>().ok()?;
4406 let mut pwd = std::mem::MaybeUninit::<libc::passwd>::uninit();
4407 let mut result = std::ptr::null_mut();
4408 let mut buffer = vec![0i8; 16_384];
4409 let rc = unsafe {
4410 libc::getpwuid_r(
4411 uid,
4412 pwd.as_mut_ptr(),
4413 buffer.as_mut_ptr(),
4414 buffer.len(),
4415 &mut result,
4416 )
4417 };
4418 if rc != 0 || result.is_null() {
4419 return None;
4420 }
4421 let pwd = unsafe { pwd.assume_init() };
4422 Some(
4423 unsafe { CStr::from_ptr(pwd.pw_name) }
4424 .to_string_lossy()
4425 .into_owned(),
4426 )
4427}
4428
4429#[cfg(not(unix))]
4430fn resolve_uid_name(_raw: &str) -> Option<String> {
4431 None
4432}
4433
4434#[cfg(unix)]
4435fn resolve_gid_name(raw: &str) -> Option<String> {
4436 let gid = raw.parse::<libc::gid_t>().ok()?;
4437 let mut grp = std::mem::MaybeUninit::<libc::group>::uninit();
4438 let mut result = std::ptr::null_mut();
4439 let mut buffer = vec![0i8; 16_384];
4440 let rc = unsafe {
4441 libc::getgrgid_r(
4442 gid,
4443 grp.as_mut_ptr(),
4444 buffer.as_mut_ptr(),
4445 buffer.len(),
4446 &mut result,
4447 )
4448 };
4449 if rc != 0 || result.is_null() {
4450 return None;
4451 }
4452 let grp = unsafe { grp.assume_init() };
4453 Some(
4454 unsafe { CStr::from_ptr(grp.gr_name) }
4455 .to_string_lossy()
4456 .into_owned(),
4457 )
4458}
4459
4460#[cfg(not(unix))]
4461fn resolve_gid_name(_raw: &str) -> Option<String> {
4462 None
4463}
4464
4465const MESSAGE_ID_NAMES: &[(&str, &str)] = &[
4466 ("f77379a8490b408bbe5f6940505a777b", "Journal started"),
4467 ("d93fb3c9c24d451a97cea615ce59c00b", "Journal stopped"),
4468 (
4469 "a596d6fe7bfa4994828e72309e95d61e",
4470 "Journal messages suppressed",
4471 ),
4472 (
4473 "e9bf28e6e834481bb6f48f548ad13606",
4474 "Journal messages missed",
4475 ),
4476 (
4477 "ec387f577b844b8fa948f33cad9a75e6",
4478 "Journal disk space usage",
4479 ),
4480 ("fc2e22bc6ee647b6b90729ab34a250b1", "Coredump"),
4481 ("5aadd8e954dc4b1a8c954d63fd9e1137", "Coredump truncated"),
4482 ("1f4e0a44a88649939aaea34fc6da8c95", "Backtrace"),
4483 ("8d45620c1a4348dbb17410da57c60c66", "User Session created"),
4484 (
4485 "3354939424b4456d9802ca8333ed424a",
4486 "User Session terminated",
4487 ),
4488 ("fcbefc5da23d428093f97c82a9290f7b", "Seat started"),
4489 ("e7852bfe46784ed0accde04bc864c2d5", "Seat removed"),
4490 (
4491 "24d8d4452573402496068381a6312df2",
4492 "VM or container started",
4493 ),
4494 (
4495 "58432bd3bace477cb514b56381b8a758",
4496 "VM or container stopped",
4497 ),
4498 ("c7a787079b354eaaa9e77b371893cd27", "Time change"),
4499 ("45f82f4aef7a4bbf942ce861d1f20990", "Timezone change"),
4500 (
4501 "50876a9db00f4c40bde1a2ad381c3a1b",
4502 "System configuration issues",
4503 ),
4504 (
4505 "b07a249cd024414a82dd00cd181378ff",
4506 "System start-up completed",
4507 ),
4508 (
4509 "eed00a68ffd84e31882105fd973abdd1",
4510 "User start-up completed",
4511 ),
4512 ("6bbd95ee977941e497c48be27c254128", "Sleep start"),
4513 ("8811e6df2a8e40f58a94cea26f8ebf14", "Sleep stop"),
4514 (
4515 "98268866d1d54a499c4e98921d93bc40",
4516 "System shutdown initiated",
4517 ),
4518 (
4519 "c14aaf76ec284a5fa1f105f88dfb061c",
4520 "System factory reset initiated",
4521 ),
4522 ("d9ec5e95e4b646aaaea2fd05214edbda", "Container init crashed"),
4523 (
4524 "3ed0163e868a4417ab8b9e210407a96c",
4525 "System reboot failed after crash",
4526 ),
4527 ("645c735537634ae0a32b15a7c6cba7d4", "Init execution froze"),
4528 (
4529 "5addb3a06a734d3396b794bf98fb2d01",
4530 "Init crashed no coredump",
4531 ),
4532 ("5c9e98de4ab94c6a9d04d0ad793bd903", "Init crashed no fork"),
4533 (
4534 "5e6f1f5e4db64a0eaee3368249d20b94",
4535 "Init crashed unknown signal",
4536 ),
4537 (
4538 "83f84b35ee264f74a3896a9717af34cb",
4539 "Init crashed systemd signal",
4540 ),
4541 (
4542 "3a73a98baf5b4b199929e3226c0be783",
4543 "Init crashed process signal",
4544 ),
4545 (
4546 "2ed18d4f78ca47f0a9bc25271c26adb4",
4547 "Init crashed waitpid failed",
4548 ),
4549 (
4550 "56b1cd96f24246c5b607666fda952356",
4551 "Init crashed coredump failed",
4552 ),
4553 ("4ac7566d4d7548f4981f629a28f0f829", "Init crashed coredump"),
4554 (
4555 "38e8b1e039ad469291b18b44c553a5b7",
4556 "Crash shell failed to fork",
4557 ),
4558 (
4559 "872729b47dbe473eb768ccecd477beda",
4560 "Crash shell failed to execute",
4561 ),
4562 ("658a67adc1c940b3b3316e7e8628834a", "Selinux failed"),
4563 ("e6f456bd92004d9580160b2207555186", "Battery low warning"),
4564 (
4565 "267437d33fdd41099ad76221cc24a335",
4566 "Battery low powering off",
4567 ),
4568 (
4569 "79e05b67bc4545d1922fe47107ee60c5",
4570 "Manager mainloop failed",
4571 ),
4572 ("dbb136b10ef4457ba47a795d62f108c9", "Manager no xdgdir path"),
4573 (
4574 "ed158c2df8884fa584eead2d902c1032",
4575 "Init failed to drop capability bounding set of usermode",
4576 ),
4577 (
4578 "42695b500df048298bee37159caa9f2e",
4579 "Init failed to drop capability bounding set",
4580 ),
4581 (
4582 "bfc2430724ab44499735b4f94cca9295",
4583 "User manager can't disable new privileges",
4584 ),
4585 (
4586 "59288af523be43a28d494e41e26e4510",
4587 "Manager failed to start default target",
4588 ),
4589 (
4590 "689b4fcc97b4486ea5da92db69c9e314",
4591 "Manager failed to isolate default target",
4592 ),
4593 (
4594 "5ed836f1766f4a8a9fc5da45aae23b29",
4595 "Manager failed to collect passed file descriptors",
4596 ),
4597 (
4598 "6a40fbfbd2ba4b8db02fb40c9cd090d7",
4599 "Init failed to fix up environment variables",
4600 ),
4601 (
4602 "0e54470984ac419689743d957a119e2e",
4603 "Manager failed to allocate",
4604 ),
4605 (
4606 "d67fa9f847aa4b048a2ae33535331adb",
4607 "Manager failed to write Smack",
4608 ),
4609 (
4610 "af55a6f75b544431b72649f36ff6d62c",
4611 "System shutdown critical error",
4612 ),
4613 (
4614 "d18e0339efb24a068d9c1060221048c2",
4615 "Init failed to fork off valgrind",
4616 ),
4617 ("7d4958e842da4a758f6c1cdc7b36dcc5", "Unit starting"),
4618 ("39f53479d3a045ac8e11786248231fbf", "Unit started"),
4619 ("be02cf6855d2428ba40df7e9d022f03d", "Unit failed"),
4620 ("de5b426a63be47a7b6ac3eaac82e2f6f", "Unit stopping"),
4621 ("9d1aaa27d60140bd96365438aad20286", "Unit stopped"),
4622 ("d34d037fff1847e6ae669a370e694725", "Unit reloading"),
4623 ("7b05ebc668384222baa8881179cfda54", "Unit reloaded"),
4624 ("5eb03494b6584870a536b337290809b3", "Unit restart scheduled"),
4625 ("ae8f7b866b0347b9af31fe1c80b127c0", "Unit resources"),
4626 ("7ad2d189f7e94e70a38c781354912448", "Unit success"),
4627 ("0e4284a0caca4bfc81c0bb6786972673", "Unit skipped"),
4628 ("d9b373ed55a64feb8242e02dbe79a49c", "Unit failure result"),
4629 (
4630 "641257651c1b4ec9a8624d7a40a9e1e7",
4631 "Process execution failed",
4632 ),
4633 ("98e322203f7a4ed290d09fe03c09fe15", "Unit process exited"),
4634 ("0027229ca0644181a76c4e92458afa2e", "Syslog forward missed"),
4635 (
4636 "1dee0369c7fc4736b7099b38ecb46ee7",
4637 "Mount point is not empty",
4638 ),
4639 ("d989611b15e44c9dbf31e3c81256e4ed", "Unit oomd kill"),
4640 ("fe6faa94e7774663a0da52717891d8ef", "Unit out of memory"),
4641 ("b72ea4a2881545a0b50e200e55b9b06f", "Lid opened"),
4642 ("b72ea4a2881545a0b50e200e55b9b070", "Lid closed"),
4643 ("f5f416b862074b28927a48c3ba7d51ff", "System docked"),
4644 ("51e171bd585248568110144c517cca53", "System undocked"),
4645 ("b72ea4a2881545a0b50e200e55b9b071", "Power key"),
4646 ("3e0117101eb243c1b9a50db3494ab10b", "Power key long press"),
4647 ("9fa9d2c012134ec385451ffe316f97d0", "Reboot key"),
4648 ("f1c59a58c9d943668965c337caec5975", "Reboot key long press"),
4649 ("b72ea4a2881545a0b50e200e55b9b072", "Suspend key"),
4650 ("bfdaf6d312ab4007bc1fe40a15df78e8", "Suspend key long press"),
4651 ("b72ea4a2881545a0b50e200e55b9b073", "Hibernate key"),
4652 (
4653 "167836df6f7f428e98147227b2dc8945",
4654 "Hibernate key long press",
4655 ),
4656 ("c772d24e9a884cbeb9ea12625c306c01", "Invalid configuration"),
4657 (
4658 "1675d7f172174098b1108bf8c7dc8f5d",
4659 "DNSSEC validation failed",
4660 ),
4661 (
4662 "4d4408cfd0d144859184d1e65d7c8a65",
4663 "DNSSEC trust anchor revoked",
4664 ),
4665 ("36db2dfa5a9045e1bd4af5f93e1cf057", "DNSSEC turned off"),
4666 ("b61fdac612e94b9182285b998843061f", "Username unsafe"),
4667 (
4668 "1b3bb94037f04bbf81028e135a12d293",
4669 "Mount point path not suitable",
4670 ),
4671 (
4672 "010190138f494e29a0ef6669749531aa",
4673 "Device path not suitable",
4674 ),
4675 ("b480325f9c394a7b802c231e51a2752c", "Nobody user unsuitable"),
4676 (
4677 "1c0454c1bd2241e0ac6fefb4bc631433",
4678 "Systemd udev settle deprecated",
4679 ),
4680 ("7c8a41f37b764941a0e1780b1be2f037", "Time initial sync"),
4681 ("7db73c8af0d94eeb822ae04323fe6ab6", "Time initial bump"),
4682 ("9e7066279dc8403da79ce4b1a69064b2", "Shutdown scheduled"),
4683 ("249f6fb9e6e2428c96f3f0875681ffa3", "Shutdown canceled"),
4684 ("3f7d5ef3e54f4302b4f0b143bb270cab", "TPM PCR Extended"),
4685 ("f9b0be465ad540d0850ad32172d57c21", "Memory Trimmed"),
4686 ("a8fa8dacdb1d443e9503b8be367a6adb", "SysV Service Found"),
4687 (
4688 "187c62eb1e7f463bb530394f52cb090f",
4689 "Portable Service attached",
4690 ),
4691 (
4692 "76c5c754d628490d8ecba4c9d042112b",
4693 "Portable Service detached",
4694 ),
4695 (
4696 "9cf56b8baf9546cf9478783a8de42113",
4697 "systemd-networkd sysctl changed by foreign process",
4698 ),
4699 (
4700 "ad7089f928ac4f7ea00c07457d47ba8a",
4701 "SRK into TPM authorization failure",
4702 ),
4703 (
4704 "b2bcbaf5edf948e093ce50bbea0e81ec",
4705 "Secure Attention Key (SAK) was pressed",
4706 ),
4707 ("7fc63312330b479bb32e598d47cef1a8", "dbus activate no unit"),
4708 (
4709 "ee9799dab1e24d81b7bee7759a543e1b",
4710 "dbus activate masked unit",
4711 ),
4712 ("a0fa58cafd6f4f0c8d003d16ccf9e797", "dbus broker exited"),
4713 ("c8c6cde1c488439aba371a664353d9d8", "dbus dirwatch"),
4714 ("8af3357071af4153af414daae07d38e7", "dbus dispatch stats"),
4715 ("199d4300277f495f84ba4028c984214c", "dbus no sopeergroup"),
4716 (
4717 "b209c0d9d1764ab38d13b8e00d1784d6",
4718 "dbus protocol violation",
4719 ),
4720 ("6fa70fa776044fa28be7a21daf42a108", "dbus receive failed"),
4721 (
4722 "0ce0fa61d1a9433dabd67417f6b8e535",
4723 "dbus service failed open",
4724 ),
4725 ("24dc708d9e6a4226a3efe2033bb744de", "dbus service invalid"),
4726 ("f15d2347662d483ea9bcd8aa1a691d28", "dbus sighup"),
4727 (
4728 "0ce153587afa4095832d233c17a88001",
4729 "Gnome SM startup succeeded",
4730 ),
4731 (
4732 "10dd2dc188b54a5e98970f56499d1f73",
4733 "Gnome SM unrecoverable failure",
4734 ),
4735 ("f3ea493c22934e26811cd62abe8e203a", "Gnome shell started"),
4736 ("c7b39b1e006b464599465e105b361485", "Flatpak cache"),
4737 ("75ba3deb0af041a9a46272ff85d9e73e", "Flathub pulls"),
4738 ("f02bce89a54e4efab3a94a797d26204a", "Flathub pull errors"),
4739 ("dd11929c788e48bdbb6276fb5f26b08a", "Boltd starting"),
4740 ("1e6061a9fbd44501b3ccc368119f2b69", "Netdata startup"),
4741 (
4742 "ed4cdb8f1beb4ad3b57cb3cae2d162fa",
4743 "Netdata connection from child",
4744 ),
4745 (
4746 "6e2e3839067648968b646045dbf28d66",
4747 "Netdata connection to parent",
4748 ),
4749 (
4750 "9ce0cb58ab8b44df82c4bf1ad9ee22de",
4751 "Netdata alert transition",
4752 ),
4753 (
4754 "6db0018e83e34320ae2a659d78019fb7",
4755 "Netdata alert notification",
4756 ),
4757 ("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"),
4758 (
4759 "8ddaf5ba33a74078b609250db1e951f3",
4760 "Sensor state transition",
4761 ),
4762 (
4763 "ec87a56120d5431bace51e2fb8bba243",
4764 "Netdata log flood protection",
4765 ),
4766 (
4767 "acb33cb95778476baac702eb7e4e151d",
4768 "Netdata Cloud connection",
4769 ),
4770 (
4771 "d1f59606dd4d41e3b217a0cfcae8e632",
4772 "Netdata extreme cardinality",
4773 ),
4774 ("02f47d350af5449197bf7a95b605a468", "Netdata exit reason"),
4775 (
4776 "4fdf40816c124623a032b7fe73beacb8",
4777 "Netdata dynamic configuration",
4778 ),
4779];
4780
4781fn message_id_name(raw: &str) -> Option<&'static str> {
4782 MESSAGE_ID_NAMES
4783 .iter()
4784 .find_map(|(candidate, name)| (*candidate == raw).then_some(*name))
4785}
4786
4787#[cfg(test)]
4788mod tests {
4789 use super::*;
4790 use crate::ExplorerHistogramBucket;
4791 use journal_core::file::{JournalFile, JournalFileOptions, JournalWriter, MmapMut};
4792 use journal_core::repository::File as RepoFile;
4793 use std::cell::Cell;
4794 use std::collections::HashMap;
4795 use tempfile::TempDir;
4796
4797 #[derive(Default)]
4798 struct TestNetdataState {
4799 metadata: HashMap<PathBuf, NetdataJournalFileMetadata>,
4800 updates: Vec<(PathBuf, u64)>,
4801 }
4802
4803 impl NetdataFunctionState for TestNetdataState {
4804 fn file_metadata(&self, path: &Path) -> Option<NetdataJournalFileMetadata> {
4805 self.metadata.get(path).cloned()
4806 }
4807
4808 fn update_file_journal_vs_realtime_delta_usec(&mut self, path: &Path, delta_usec: u64) {
4809 self.updates.push((path.to_path_buf(), delta_usec));
4810 }
4811 }
4812
4813 fn test_uuid(seed: u8) -> uuid::Uuid {
4814 uuid::Uuid::from_bytes([seed; 16])
4815 }
4816
4817 fn write_netdata_test_journal(directory: &std::path::Path, count: usize) {
4818 write_named_netdata_test_journal(
4819 directory,
4820 "netdata-api-test.journal",
4821 count,
4822 1_700_000_000_000_000,
4823 );
4824 }
4825
4826 fn write_named_netdata_test_journal(
4827 directory: &std::path::Path,
4828 name: &str,
4829 count: usize,
4830 start_realtime_usec: u64,
4831 ) {
4832 write_stepped_netdata_test_journal(directory, name, count, start_realtime_usec, 1);
4833 }
4834
4835 fn write_stepped_netdata_test_journal(
4836 directory: &std::path::Path,
4837 name: &str,
4838 count: usize,
4839 start_realtime_usec: u64,
4840 step_realtime_usec: u64,
4841 ) {
4842 std::fs::create_dir_all(directory).expect("create journal dir");
4843 let path = directory.join(name);
4844 let repo_file = RepoFile::from_path(&path).expect("repo file");
4845 let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
4846 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
4847 let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
4848 for index in 0..count {
4849 let message = format!("MESSAGE=row-{index}");
4850 let service = if index % 2 == 0 {
4851 b"SERVICE=even".as_slice()
4852 } else {
4853 b"SERVICE=odd".as_slice()
4854 };
4855 let payloads: [&[u8]; 3] = [message.as_bytes(), service, b"PRIORITY=6"];
4856 let realtime = start_realtime_usec
4857 .saturating_add((index as u64).saturating_mul(step_realtime_usec));
4858 writer
4859 .add_entry(&mut file, &payloads, realtime, realtime)
4860 .expect("write entry");
4861 }
4862 file.sync().expect("sync journal");
4863 }
4864
4865 struct NetdataCollectedPages {
4866 messages: Vec<String>,
4867 timestamps: Vec<u64>,
4868 }
4869
4870 fn collect_netdata_pages(
4871 directory: &std::path::Path,
4872 direction: Direction,
4873 page_size: usize,
4874 ) -> NetdataCollectedPages {
4875 let mut out = NetdataCollectedPages {
4876 messages: Vec::new(),
4877 timestamps: Vec::new(),
4878 };
4879 let mut anchor = None;
4880 for page in 0..100 {
4881 let mut request = json!({
4882 "after": 1_700_000_000,
4883 "before": 1_800_000_000,
4884 "last": page_size,
4885 "direction": direction_name(direction),
4886 "data_only": true,
4887 });
4888 if let Some(anchor) = anchor {
4889 request["anchor"] = Value::from(anchor);
4890 }
4891 let response = run_netdata_contract_request(directory, request);
4892 assert_eq!(
4893 response["status"], 200,
4894 "page {page} response = {response:#}"
4895 );
4896 let messages = response_column_strings(&response, "MESSAGE");
4897 let timestamps = response_column_u64s(&response, "timestamp");
4898 assert_eq!(messages.len(), timestamps.len());
4899 if messages.is_empty() {
4900 break;
4901 }
4902 out.messages.extend(messages);
4903 out.timestamps.extend(timestamps.iter().copied());
4904 anchor = Some(match direction {
4905 Direction::Forward => timestamps[0],
4906 Direction::Backward => *timestamps.last().expect("page timestamp"),
4907 });
4908 if timestamps.len() < page_size {
4909 break;
4910 }
4911 }
4912 assert!(!out.messages.is_empty(), "pagination returned no rows");
4913 out
4914 }
4915
4916 fn run_netdata_contract_request(directory: &std::path::Path, request: Value) -> Value {
4917 NetdataJournalFunction::systemd_journal_plugin_compatible()
4918 .run_directory_request_json_with_options(
4919 directory,
4920 &request,
4921 NetdataFunctionRunOptions::from_timeout_seconds(0),
4922 )
4923 .expect("run function")
4924 }
4925
4926 fn direction_name(direction: Direction) -> &'static str {
4927 match direction {
4928 Direction::Forward => "forward",
4929 Direction::Backward => "backward",
4930 }
4931 }
4932
4933 fn response_column_strings(response: &Value, field: &str) -> Vec<String> {
4934 let index = response_column_index(response, field);
4935 response["data"]
4936 .as_array()
4937 .expect("data")
4938 .iter()
4939 .map(|row| {
4940 row.as_array().expect("row")[index]
4941 .as_str()
4942 .expect("string cell")
4943 .to_string()
4944 })
4945 .collect()
4946 }
4947
4948 fn response_column_u64s(response: &Value, field: &str) -> Vec<u64> {
4949 let index = response_column_index(response, field);
4950 response["data"]
4951 .as_array()
4952 .expect("data")
4953 .iter()
4954 .map(|row| {
4955 row.as_array().expect("row")[index]
4956 .as_u64()
4957 .expect("u64 cell")
4958 })
4959 .collect()
4960 }
4961
4962 fn response_column_index(response: &Value, field: &str) -> usize {
4963 response["columns"][field]["index"]
4964 .as_u64()
4965 .expect("column index") as usize
4966 }
4967
4968 fn response_facet_count(response: &Value, key: &str, field: &str, value: &str) -> u64 {
4969 for facet in response[key].as_array().expect("facets") {
4970 if facet["id"] != field {
4971 continue;
4972 }
4973 for option in facet["options"].as_array().expect("facet options") {
4974 if option["id"] == value {
4975 return option["count"].as_u64().expect("facet count");
4976 }
4977 }
4978 panic!("{key} facet {field} missing value {value}");
4979 }
4980 panic!("{key} missing facet {field}");
4981 }
4982
4983 fn response_histogram_total(response: &Value, key: &str, value: &str) -> u64 {
4984 let chart = &response[key]["chart"];
4985 let names = chart["view"]["dimensions"]["names"]
4986 .as_array()
4987 .expect("histogram names");
4988 let dimension_index = names
4989 .iter()
4990 .position(|name| name.as_str() == Some(value))
4991 .expect("histogram dimension");
4992 chart["result"]["data"]
4993 .as_array()
4994 .expect("histogram data")
4995 .iter()
4996 .map(|point| {
4997 point.as_array().expect("histogram point")[dimension_index + 1]
4998 .as_array()
4999 .expect("histogram cell")[0]
5000 .as_u64()
5001 .unwrap_or_default()
5002 })
5003 .sum()
5004 }
5005
5006 fn assert_unique_messages(messages: &[String]) {
5007 let mut seen = HashSet::new();
5008 for message in messages {
5009 assert!(
5010 seen.insert(message),
5011 "duplicate message {message} in {messages:?}"
5012 );
5013 }
5014 }
5015
5016 #[test]
5017 fn parses_netdata_selections_as_and_fields_or_values() {
5018 let request = json!({
5019 "after": 200_000_000,
5020 "before": 200_000_100,
5021 "direction": "forward",
5022 "last": 25,
5023 "facets": ["PRIORITY"],
5024 "selections": {
5025 "PRIORITY": ["warning", "error"],
5026 "_HOSTNAME": ["node-a"],
5027 "__logs_sources": ["all-local-system-logs"],
5028 }
5029 });
5030
5031 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5032 .expect("parse request");
5033 assert_eq!(parsed.after_realtime_usec, Some(200_000_000_000_000));
5034 assert_eq!(parsed.before_realtime_usec, Some(200_000_100_999_999));
5035 assert_eq!(parsed.direction, Direction::Forward);
5036 assert_eq!(parsed.limit, 25);
5037 assert_eq!(parsed.filters.len(), 2);
5038 assert_eq!(parsed.filters[0].field, b"PRIORITY");
5039 assert_eq!(parsed.filters[0].values, vec![b"4".to_vec(), b"3".to_vec()]);
5040 assert_eq!(parsed.filters[1].field, b"_HOSTNAME");
5041 assert_eq!(parsed.filters[1].values, vec![b"node-a".to_vec()]);
5042 }
5043
5044 #[test]
5045 fn netdata_last_one_keeps_echo_and_uses_effective_minimum_two() {
5046 let request = json!({
5047 "after": 200_000_000,
5048 "before": 200_000_100,
5049 "last": 1
5050 });
5051
5052 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5053 .expect("parse request");
5054 let query =
5055 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5056
5057 assert_eq!(parsed.echo.get("last").and_then(Value::as_u64), Some(1));
5058 assert_eq!(parsed.limit, 2);
5059 assert_eq!(query.limit, 2);
5060 }
5061
5062 #[test]
5063 fn netdata_facet_counts_use_native_sliced_filter_semantics() {
5064 let request = json!({
5065 "after": 200_000_000,
5066 "before": 200_000_100,
5067 "facets": ["PRIORITY"],
5068 "selections": {
5069 "PRIORITY": ["warning"]
5070 }
5071 });
5072
5073 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5074 .expect("parse request");
5075 let query =
5076 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5077
5078 assert!(!query.exclude_facet_field_filters);
5079 }
5080
5081 #[test]
5082 fn netdata_multi_filter_facet_counts_exclude_same_field_filter() {
5083 let request = json!({
5084 "after": 200_000_000,
5085 "before": 200_000_100,
5086 "facets": ["PRIORITY", "_BOOT_ID"],
5087 "selections": {
5088 "PRIORITY": ["warning"],
5089 "_BOOT_ID": ["738043aea7b3417cbc3e9941ad26f769"]
5090 }
5091 });
5092
5093 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5094 .expect("parse request");
5095 let query =
5096 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5097
5098 assert!(query.exclude_facet_field_filters);
5099 }
5100
5101 #[test]
5102 fn parses_netdata_fts_query_like_simple_pattern() {
5103 let (terms, positives, negatives) =
5104 parse_fts_query_patterns(r"error|warning|!debug|escaped\|pipe|\!literal| a*B");
5105
5106 assert_eq!(
5107 positives,
5108 vec![
5109 b"error".to_vec(),
5110 b"warning".to_vec(),
5111 b"escaped|pipe".to_vec(),
5112 b"!literal".to_vec(),
5113 b" a*B".to_vec(),
5114 ]
5115 );
5116 assert_eq!(negatives, vec![b"debug".to_vec()]);
5117 assert_eq!(terms.len(), 6);
5118 assert!(!terms[0].negative);
5119 assert!(terms[2].negative);
5120 assert_eq!(
5121 terms[5],
5122 ExplorerFtsPattern {
5123 parts: vec![b" a".to_vec(), b"B".to_vec()],
5124 negative: false,
5125 }
5126 );
5127
5128 let request = json!({
5129 "query": r"alpha|!debug|needle\|pipe",
5130 "facets": ["PRIORITY"],
5131 });
5132 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5133 .expect("parse request");
5134 let query =
5135 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5136 assert_eq!(
5137 query.fts_patterns,
5138 vec![b"alpha".to_vec(), b"needle|pipe".to_vec()]
5139 );
5140 assert_eq!(query.fts_negative_patterns, vec![b"debug".to_vec()]);
5141 assert_eq!(query.fts_terms.len(), 3);
5142 }
5143
5144 #[test]
5145 fn netdata_requests_never_enable_debug_row_traversal_column_collection() {
5146 let request = json!({
5147 "facets": ["PRIORITY", "_HOSTNAME"],
5148 "histogram": "PRIORITY",
5149 "last": 25
5150 });
5151
5152 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5153 .expect("parse request");
5154 let query =
5155 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5156
5157 assert!(!query.debug_collect_column_fields_by_row_traversal);
5158 }
5159
5160 #[test]
5161 fn netdata_function_reports_requested_facet_groups_even_when_absent() {
5162 let dir = TempDir::new().expect("tempdir");
5163 write_netdata_test_journal(dir.path(), 10);
5164 let request = json!({
5165 "after": 1_700_000_000,
5166 "before": 1_700_000_010,
5167 "facets": ["SERVICE", "MISSING_FIELD"],
5168 "histogram": "SERVICE",
5169 "last": 5,
5170 "slice": true
5171 });
5172 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5173
5174 let response = function
5175 .run_directory_request_json_with_options(
5176 dir.path(),
5177 &request,
5178 NetdataFunctionRunOptions::from_timeout_seconds(0),
5179 )
5180 .expect("run function");
5181
5182 let columns = response["columns"].as_object().expect("columns");
5183 assert!(columns.contains_key("SERVICE"));
5184 assert!(columns.contains_key("MISSING_FIELD"));
5185 let facets = response["facets"].as_array().expect("facets");
5186 assert_eq!(
5187 facets
5188 .iter()
5189 .filter_map(|facet| facet["id"].as_str())
5190 .collect::<Vec<_>>(),
5191 vec!["SERVICE", "MISSING_FIELD"]
5192 );
5193 assert_eq!(
5194 facets[1]["options"].as_array().expect("missing options"),
5195 &Vec::<Value>::new()
5196 );
5197 let accepted = response["accepted_params"]
5198 .as_array()
5199 .expect("accepted params");
5200 assert!(accepted.iter().any(|value| value == "SERVICE"));
5201 assert!(accepted.iter().any(|value| value == "MISSING_FIELD"));
5202 let histograms = response["available_histograms"]
5203 .as_array()
5204 .expect("available histograms");
5205 assert!(histograms.iter().any(|value| value["id"] == "SERVICE"));
5206 assert!(
5207 histograms
5208 .iter()
5209 .any(|value| value["id"] == "MISSING_FIELD")
5210 );
5211 }
5212
5213 #[test]
5214 fn netdata_function_reports_zero_count_existing_facets_for_empty_results() {
5215 let dir = TempDir::new().expect("tempdir");
5216 write_netdata_test_journal(dir.path(), 10);
5217 let request = json!({
5218 "after": 1_700_000_000,
5219 "before": 1_700_000_010,
5220 "facets": ["PRIORITY"],
5221 "histogram": "PRIORITY",
5222 "selections": {
5223 "SERVICE": ["missing"]
5224 },
5225 "last": 5,
5226 "slice": true
5227 });
5228 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5229
5230 let response = function
5231 .run_directory_request_json_with_options(
5232 dir.path(),
5233 &request,
5234 NetdataFunctionRunOptions::from_timeout_seconds(0),
5235 )
5236 .expect("run function");
5237
5238 let facets = response["facets"].as_array().expect("facets");
5239 assert_eq!(facets.len(), 1);
5240 assert_eq!(facets[0]["id"], "PRIORITY");
5241 let options = facets[0]["options"].as_array().expect("options");
5242 assert!(options.iter().any(|option| {
5243 option["id"] == "6" && option["name"] == "info" && option["count"] == 0
5244 }));
5245 assert_eq!(response["items"]["matched"], 0);
5246 }
5247
5248 #[test]
5249 fn netdata_function_api_reports_progress() {
5250 let dir = TempDir::new().expect("tempdir");
5251 write_netdata_test_journal(dir.path(), 9_000);
5252 let request = json!({
5253 "after": 1_700_000_000,
5254 "before": 1_700_000_010,
5255 "facets": ["SERVICE"],
5256 "histogram": "SERVICE",
5257 "last": 0
5258 });
5259 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5260 let mut reports = 0u64;
5261 let mut progress = |progress: NetdataFunctionProgress| {
5262 reports = reports.saturating_add(1);
5263 assert_eq!(progress.current_file, 1);
5264 assert_eq!(progress.total_files, 1);
5265 assert!(progress.stats.rows_examined <= 9_000);
5266 };
5267 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5268 options.progress_interval = Duration::ZERO;
5269 options.progress_callback = Some(&mut progress);
5270
5271 let response = function
5272 .run_directory_request_json_with_options(dir.path(), &request, options)
5273 .expect("run function");
5274
5275 assert_eq!(response["status"], 200);
5276 assert!(reports > 0);
5277 assert_eq!(response["last_modified"], 1_700_000_000_008_999u64);
5278 }
5279
5280 #[test]
5281 fn netdata_function_api_reports_file_end_progress_for_small_scans() {
5282 let dir = TempDir::new().expect("tempdir");
5283 write_netdata_test_journal(dir.path(), 10);
5284 let request = json!({
5285 "after": 1_700_000_000,
5286 "before": 1_700_000_010,
5287 "facets": ["SERVICE"],
5288 "histogram": "SERVICE",
5289 "last": 0
5290 });
5291 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5292 let mut reports = 0u64;
5293 let mut last_rows_examined = 0u64;
5294 let mut progress = |progress: NetdataFunctionProgress| {
5295 reports = reports.saturating_add(1);
5296 last_rows_examined = progress.stats.rows_examined;
5297 };
5298 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5299 options.progress_callback = Some(&mut progress);
5300
5301 let response = function
5302 .run_directory_request_json_with_options(dir.path(), &request, options)
5303 .expect("run function");
5304
5305 assert_eq!(response["status"], 200);
5306 assert_eq!(reports, 1);
5307 assert_eq!(last_rows_examined, 10);
5308 }
5309
5310 #[test]
5311 fn netdata_function_progress_counts_only_query_files() {
5312 let dir = TempDir::new().expect("tempdir");
5313 write_named_netdata_test_journal(
5314 dir.path(),
5315 "old-window.journal",
5316 10,
5317 1_600_000_000_000_000,
5318 );
5319 write_named_netdata_test_journal(
5320 dir.path(),
5321 "current-window.journal",
5322 10,
5323 1_700_000_000_000_000,
5324 );
5325 let request = json!({
5326 "after": 1_700_000_000,
5327 "before": 1_700_000_010,
5328 "facets": ["SERVICE"],
5329 "histogram": "SERVICE",
5330 "last": 0
5331 });
5332 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5333 let mut reports = Vec::new();
5334 let mut progress = |progress: NetdataFunctionProgress| {
5335 reports.push((progress.current_file, progress.total_files));
5336 };
5337 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5338 options.progress_callback = Some(&mut progress);
5339
5340 let response = function
5341 .run_directory_request_json_with_options(dir.path(), &request, options)
5342 .expect("run function");
5343
5344 assert_eq!(response["status"], 200);
5345 assert_eq!(response["_journal_files"]["matched"], 1);
5346 assert_eq!(reports, vec![(1, 1)]);
5347 }
5348
5349 #[test]
5350 fn netdata_function_api_reports_cancellation() {
5351 let dir = TempDir::new().expect("tempdir");
5352 write_netdata_test_journal(dir.path(), 9_000);
5353 let request = json!({
5354 "after": 1_700_000_000,
5355 "before": 1_700_000_010,
5356 "facets": ["SERVICE"],
5357 "histogram": "SERVICE",
5358 "last": 0
5359 });
5360 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5361 let is_cancelled = || true;
5362 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5363 options.cancellation_callback = Some(&is_cancelled);
5364
5365 let response = function
5366 .run_directory_request_json_with_options(dir.path(), &request, options)
5367 .expect("run function");
5368
5369 assert_eq!(response["status"], 499);
5370 assert_eq!(response["errorMessage"], "Request cancelled.");
5371 assert_eq!(
5372 response.as_object().expect("response object").len(),
5373 2,
5374 "plugin-compatible function errors only include status and errorMessage"
5375 );
5376 }
5377
5378 #[test]
5379 fn netdata_function_api_cancels_during_active_scan() {
5380 let dir = TempDir::new().expect("tempdir");
5381 write_netdata_test_journal(dir.path(), 9_000);
5382 let request = json!({
5383 "after": 1_700_000_000,
5384 "before": 1_700_000_010,
5385 "facets": ["SERVICE"],
5386 "histogram": "SERVICE",
5387 "last": 0
5388 });
5389 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5390 let should_cancel = Cell::new(false);
5391 let mut reports = 0u64;
5392 let mut progress = |progress: NetdataFunctionProgress| {
5393 reports = reports.saturating_add(1);
5394 if progress.stats.rows_examined > 0 {
5395 should_cancel.set(true);
5396 }
5397 };
5398 let is_cancelled = || should_cancel.get();
5399 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5400 options.progress_interval = Duration::ZERO;
5401 options.progress_callback = Some(&mut progress);
5402 options.cancellation_callback = Some(&is_cancelled);
5403
5404 let response = function
5405 .run_directory_request_json_with_options(dir.path(), &request, options)
5406 .expect("run function");
5407
5408 assert_eq!(response["status"], 499);
5409 assert_eq!(response["errorMessage"], "Request cancelled.");
5410 assert!(reports > 0);
5411 assert!(should_cancel.get());
5412 }
5413
5414 #[test]
5415 fn netdata_function_api_honors_cancellation_after_final_file_progress() {
5416 let dir = TempDir::new().expect("tempdir");
5417 write_netdata_test_journal(dir.path(), 10);
5418 let request = json!({
5419 "after": 1_700_000_000,
5420 "before": 1_700_000_010,
5421 "facets": ["SERVICE"],
5422 "histogram": "SERVICE",
5423 "last": 0
5424 });
5425 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5426 let should_cancel = Cell::new(false);
5427 let mut progress = |_progress: NetdataFunctionProgress| {
5428 should_cancel.set(true);
5429 };
5430 let is_cancelled = || should_cancel.get();
5431 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5432 options.progress_callback = Some(&mut progress);
5433 options.cancellation_callback = Some(&is_cancelled);
5434
5435 let response = function
5436 .run_directory_request_json_with_options(dir.path(), &request, options)
5437 .expect("run function");
5438
5439 assert_eq!(response["status"], 499);
5440 assert_eq!(response["errorMessage"], "Request cancelled.");
5441 assert!(should_cancel.get());
5442 }
5443
5444 #[test]
5445 fn netdata_function_api_reports_timeout_as_partial_table() {
5446 let dir = TempDir::new().expect("tempdir");
5447 write_netdata_test_journal(dir.path(), 10);
5448 let request = json!({
5449 "after": 1_700_000_000,
5450 "before": 1_700_000_010,
5451 "facets": ["SERVICE"],
5452 "histogram": "SERVICE",
5453 "last": 0
5454 });
5455 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5456 let options = NetdataFunctionRunOptions {
5457 timeout: Some(Duration::ZERO),
5458 ..NetdataFunctionRunOptions::default()
5459 };
5460
5461 let response = function
5462 .run_directory_request_json_with_options(dir.path(), &request, options)
5463 .expect("run function");
5464
5465 assert_eq!(response["status"], 200);
5466 assert_eq!(response["partial"], true);
5467 assert_eq!(response["message"]["status"], "warning");
5468 assert_eq!(
5469 response["message"]["title"],
5470 "Query timed-out, incomplete data. "
5471 );
5472 }
5473
5474 #[test]
5475 fn netdata_function_api_reports_sampling_counters() {
5476 let dir = TempDir::new().expect("tempdir");
5477 write_stepped_netdata_test_journal(
5478 dir.path(),
5479 "netdata-api-test.journal",
5480 5_000,
5481 1_700_000_000_000_000,
5482 1_000,
5483 );
5484 let request = json!({
5485 "after": 1_700_000_000,
5486 "before": 1_700_000_005,
5487 "facets": ["SERVICE"],
5488 "histogram": "SERVICE",
5489 "last": 5,
5490 "sampling": 20
5491 });
5492 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5493
5494 let response = function
5495 .run_directory_request_json_with_options(
5496 dir.path(),
5497 &request,
5498 NetdataFunctionRunOptions::from_timeout_seconds(0),
5499 )
5500 .expect("run function");
5501
5502 assert_eq!(response["status"], 200);
5503 assert!(
5504 response["_sampling"]["sampled"]
5505 .as_u64()
5506 .unwrap_or_default()
5507 > 0
5508 );
5509 assert!(
5510 response["_sampling"]["unsampled"]
5511 .as_u64()
5512 .unwrap_or_default()
5513 > 0
5514 );
5515 assert!(
5516 response["_sampling"]["estimated"]
5517 .as_u64()
5518 .unwrap_or_default()
5519 > 0
5520 );
5521 assert_eq!(
5522 response["items"]["estimated"],
5523 response["_sampling"]["estimated"]
5524 );
5525 assert!(
5526 response["items"]["unsampled"].as_u64().unwrap_or_default()
5527 < response["_sampling"]["unsampled"]
5528 .as_u64()
5529 .unwrap_or_default()
5530 );
5531 assert_eq!(response["message"]["status"], "notice");
5532 }
5533
5534 #[test]
5535 fn netdata_function_api_disables_sampling_for_data_only() {
5536 let dir = TempDir::new().expect("tempdir");
5537 write_netdata_test_journal(dir.path(), 5_000);
5538 let request = json!({
5539 "after": 1_700_000_000,
5540 "before": 1_700_000_010,
5541 "data_only": true,
5542 "last": 5,
5543 "sampling": 20
5544 });
5545 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5546
5547 let response = function
5548 .run_directory_request_json_with_options(
5549 dir.path(),
5550 &request,
5551 NetdataFunctionRunOptions::from_timeout_seconds(0),
5552 )
5553 .expect("run function");
5554
5555 assert_eq!(response["status"], 200);
5556 assert!(response.get("_sampling").is_none());
5557 }
5558
5559 #[test]
5560 fn normalizes_missing_time_window_to_last_hour_like_plugin() {
5561 assert_eq!(
5562 normalize_time_window(1_000_000_000, None, None),
5563 (Some(999_996_400_000_000), Some(1_000_000_000_999_999))
5564 );
5565 }
5566
5567 #[test]
5568 fn normalizes_inverted_time_window_like_plugin() {
5569 assert_eq!(
5570 normalize_time_window(1_000_000_000, Some(200_000_100), Some(200_000_000)),
5571 (Some(200_000_000_000_000), Some(200_000_100_999_999))
5572 );
5573 }
5574
5575 #[test]
5576 fn normalizes_equal_time_window_like_plugin() {
5577 assert_eq!(
5578 normalize_time_window(1_000_000_000, Some(200_000_000), Some(200_000_000)),
5579 (Some(199_996_400_000_000), Some(200_000_000_999_999))
5580 );
5581 }
5582
5583 #[test]
5584 fn normalizes_relative_time_window_like_plugin() {
5585 assert_eq!(
5586 normalize_time_window(1_000_000_000, Some(100), Some(200)),
5587 (Some(999_999_701_000_000), Some(999_999_800_999_999))
5588 );
5589 }
5590
5591 #[test]
5592 fn normalizes_missing_after_with_supplied_before_like_plugin() {
5593 assert_eq!(
5594 normalize_time_window(1_000_000_000, None, Some(200_000_000)),
5595 (Some(199_999_401_000_000), Some(200_000_000_999_999))
5596 );
5597 }
5598
5599 #[test]
5600 fn systemd_profile_transforms_priority_and_facility_for_display() {
5601 let profile = SystemdJournalProfile;
5602 let context = DisplayContext::default();
5603 assert_eq!(
5604 profile.field_display_value(&context, DisplayScope::Data, "PRIORITY", b"7"),
5605 json!("debug")
5606 );
5607 assert_eq!(
5608 profile.field_display_value(&context, DisplayScope::Data, "SYSLOG_FACILITY", b"3"),
5609 json!("daemon")
5610 );
5611 assert_eq!(priority_to_row_severity(b"3"), "critical");
5612 assert_eq!(priority_to_row_severity(b"6"), "normal");
5613 }
5614
5615 #[test]
5616 fn dynamic_process_name_matches_plugin_fallback_order() {
5617 let mut fields = BTreeMap::new();
5618 fields.insert("SYSLOG_IDENTIFIER".to_string(), vec![b"syslog".to_vec()]);
5619 fields.insert("_COMM".to_string(), vec![b"comm".to_vec()]);
5620 fields.insert("_PID".to_string(), vec![b"42".to_vec()]);
5621 fields.insert("SYSLOG_PID".to_string(), vec![b"99".to_vec()]);
5622 assert_eq!(dynamic_process_name(&fields), "syslog[42]");
5623
5624 fields.insert("CONTAINER_NAME".to_string(), vec![b"container".to_vec()]);
5625 assert_eq!(dynamic_process_name(&fields), "container[42]");
5626
5627 fields.remove("CONTAINER_NAME");
5628 fields.remove("SYSLOG_IDENTIFIER");
5629 fields.remove("_PID");
5630 assert_eq!(dynamic_process_name(&fields), "comm[-]");
5631
5632 fields.insert("_PID".to_string(), vec![Vec::new()]);
5633 assert_eq!(dynamic_process_name(&fields), "comm");
5634
5635 fields.remove("_COMM");
5636 fields.remove("_PID");
5637 fields.insert("_EXE".to_string(), vec![b"/usr/bin/app".to_vec()]);
5638 assert_eq!(dynamic_process_name(&fields), "-");
5639 }
5640
5641 #[test]
5642 fn facet_values_are_truncated_and_collapsed_like_plugin() {
5643 let prefix = vec![b'a'; NETDATA_FACET_MAX_VALUE_LENGTH];
5644 let mut first = prefix.clone();
5645 first.extend_from_slice(b"-first");
5646 let mut second = prefix.clone();
5647 second.extend_from_slice(b"-second");
5648
5649 let mut values = BTreeMap::new();
5650 add_netdata_facet_count(&mut values, &first, 2);
5651 add_netdata_facet_count(&mut values, &second, 3);
5652
5653 assert_eq!(values.len(), 1);
5654 assert_eq!(values.get(&prefix), Some(&5));
5655 }
5656
5657 #[test]
5658 fn histogram_values_are_truncated_and_collapsed_like_plugin() {
5659 let prefix = vec![b'b'; NETDATA_FACET_MAX_VALUE_LENGTH];
5660 let mut first = prefix.clone();
5661 first.extend_from_slice(b"-first");
5662 let mut second = prefix.clone();
5663 second.extend_from_slice(b"-second");
5664
5665 let mut values = HashMap::new();
5666 values.insert(first, 2);
5667 values.insert(second, 3);
5668 let histogram = ExplorerHistogram {
5669 field: b"TEST_FIELD".to_vec(),
5670 buckets: vec![ExplorerHistogramBucket {
5671 start_realtime_usec: 1_000_000,
5672 end_realtime_usec: 2_000_000,
5673 values,
5674 }],
5675 };
5676
5677 let function = NetdataJournalFunction::systemd_journal();
5678 let rendered = function.build_histogram(&DisplayContext::default(), &histogram, None);
5679 let labels = rendered["chart"]["result"]["labels"]
5680 .as_array()
5681 .expect("labels");
5682 assert_eq!(labels.len(), 2);
5683 assert_eq!(labels[1], Value::String(String::from_utf8(prefix).unwrap()));
5684 assert_eq!(rendered["chart"]["result"]["data"][0][1][0], json!(5));
5685 }
5686
5687 #[test]
5688 fn histogram_chart_metadata_includes_empty_dimension_arrays() {
5689 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5690 let empty = function.build_histogram(
5691 &DisplayContext::default(),
5692 &ExplorerHistogram {
5693 field: b"TRAP_SEVERITY".to_vec(),
5694 buckets: vec![ExplorerHistogramBucket {
5695 start_realtime_usec: 1_700_000_000_000_000,
5696 end_realtime_usec: 1_700_000_005_000_000,
5697 values: HashMap::new(),
5698 }],
5699 },
5700 None,
5701 );
5702
5703 assert_eq!(empty["chart"]["view"]["dimensions"]["names"], json!([]));
5704 assert_eq!(empty["chart"]["view"]["dimensions"]["ids"], json!([]));
5705 assert_eq!(empty["chart"]["db"]["dimensions"]["names"], json!([]));
5706
5707 let mut values = HashMap::new();
5708 values.insert(b"warning".to_vec(), 7);
5709 let with_value = function.build_histogram(
5710 &DisplayContext::default(),
5711 &ExplorerHistogram {
5712 field: b"TRAP_SEVERITY".to_vec(),
5713 buckets: vec![ExplorerHistogramBucket {
5714 start_realtime_usec: 1_700_000_000_000_000,
5715 end_realtime_usec: 1_700_000_005_000_000,
5716 values,
5717 }],
5718 },
5719 None,
5720 );
5721
5722 assert_eq!(
5723 with_value["chart"]["view"]["dimensions"]["names"],
5724 json!(["warning"])
5725 );
5726 assert_eq!(
5727 with_value["chart"]["view"]["dimensions"]["sts"]["min"],
5728 json!([7])
5729 );
5730 }
5731
5732 #[test]
5733 fn duplicate_row_timestamps_match_plugin_direction_adjustment() {
5734 let mut backward = vec![
5735 test_located_row(100),
5736 test_located_row(100),
5737 test_located_row(100),
5738 test_located_row(90),
5739 ];
5740 make_row_timestamps_unique(&mut backward, Direction::Backward);
5741 assert_eq!(
5742 backward
5743 .iter()
5744 .map(|row| row.row.realtime_usec)
5745 .collect::<Vec<_>>(),
5746 vec![100, 99, 98, 90]
5747 );
5748
5749 let mut forward = vec![
5750 test_located_row(90),
5751 test_located_row(100),
5752 test_located_row(100),
5753 test_located_row(100),
5754 ];
5755 make_row_timestamps_unique(&mut forward, Direction::Forward);
5756 assert_eq!(
5757 forward
5758 .iter()
5759 .map(|row| row.row.realtime_usec)
5760 .collect::<Vec<_>>(),
5761 vec![90, 100, 101, 102]
5762 );
5763 }
5764
5765 #[test]
5766 fn page_window_counts_forward_anchor_like_netdata_facets() {
5767 let config = NetdataFunctionConfig::systemd_journal();
5768 let request = NetdataRequest::parse(
5769 &json!({
5770 "after": 1_700_000_000,
5771 "before": 1_700_000_010,
5772 "anchor": 1_700_000_005_000_000u64,
5773 "direction": "forward",
5774 "last": 2
5775 }),
5776 &config,
5777 )
5778 .expect("parse request");
5779 let mut window = NetdataPageWindow::for_request(&request);
5780
5781 for realtime_usec in [
5782 1_700_000_003_000_000,
5783 1_700_000_004_000_000,
5784 1_700_000_006_000_000,
5785 1_700_000_007_000_000,
5786 1_700_000_008_000_000,
5787 ] {
5788 window.observe(realtime_usec);
5789 }
5790
5791 let counters = window.counters();
5792 assert_eq!(counters.matched, 3);
5793 assert_eq!(counters.before, 1);
5794 assert_eq!(counters.after, 2);
5795 }
5796
5797 #[test]
5798 fn page_window_counts_backward_anchor_like_netdata_facets() {
5799 let config = NetdataFunctionConfig::systemd_journal();
5800 let request = NetdataRequest::parse(
5801 &json!({
5802 "after": 1_700_000_000,
5803 "before": 1_700_000_010,
5804 "anchor": 1_700_000_008_000_000u64,
5805 "direction": "backward",
5806 "last": 2
5807 }),
5808 &config,
5809 )
5810 .expect("parse request");
5811 let mut window = NetdataPageWindow::for_request(&request);
5812
5813 for realtime_usec in [
5814 1_700_000_009_000_000,
5815 1_700_000_007_000_000,
5816 1_700_000_006_000_000,
5817 1_700_000_005_000_000,
5818 ] {
5819 window.observe(realtime_usec);
5820 }
5821
5822 let counters = window.counters();
5823 assert_eq!(counters.matched, 3);
5824 assert_eq!(counters.before, 1);
5825 assert_eq!(counters.after, 1);
5826 }
5827
5828 #[test]
5829 fn page_window_counts_tail_anchor_like_netdata_facets() {
5830 let config = NetdataFunctionConfig::systemd_journal();
5831 let request = NetdataRequest::parse(
5832 &json!({
5833 "after": 1_700_000_000,
5834 "before": 1_700_000_010,
5835 "anchor": 1_700_000_008_000_000u64,
5836 "if_modified_since": 1_700_000_008_000_000u64,
5837 "data_only": true,
5838 "tail": true,
5839 "last": 2
5840 }),
5841 &config,
5842 )
5843 .expect("parse request");
5844 let mut window = NetdataPageWindow::for_request(&request);
5845
5846 for realtime_usec in [
5847 1_700_000_009_000_000,
5848 1_700_000_008_000_000,
5849 1_700_000_007_000_000,
5850 ] {
5851 window.observe(realtime_usec);
5852 }
5853
5854 let counters = window.counters();
5855 assert_eq!(counters.matched, 1);
5856 assert_eq!(counters.before, 0);
5857 assert_eq!(counters.after, 2);
5858 }
5859
5860 #[test]
5861 fn netdata_function_tail_anchor_with_newer_filtered_out_rows_returns_empty_200() {
5862 let dir = TempDir::new().expect("tempdir");
5863 let start_realtime_usec = 1_700_000_000_000_000;
5864 write_stepped_netdata_test_journal(
5865 dir.path(),
5866 "netdata-api-test.journal",
5867 2,
5868 start_realtime_usec,
5869 1_000_000,
5870 );
5871 let request = json!({
5872 "after": 1_700_000_000,
5873 "before": 1_700_000_002,
5874 "anchor": start_realtime_usec,
5875 "if_modified_since": start_realtime_usec,
5876 "data_only": true,
5877 "tail": true,
5878 "last": 5,
5879 "selections": {
5880 "SERVICE": ["even"]
5881 }
5882 });
5883 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5884
5885 let response = function
5886 .run_directory_request_json_with_options(
5887 dir.path(),
5888 &request,
5889 NetdataFunctionRunOptions::from_timeout_seconds(0),
5890 )
5891 .expect("run function");
5892
5893 assert_eq!(response["status"], 200);
5894 assert_eq!(
5895 response_column_strings(&response, "MESSAGE"),
5896 Vec::<String>::new()
5897 );
5898 }
5899
5900 #[test]
5901 fn netdata_function_pages_with_anchor_without_duplicate_or_missing_rows() {
5902 let dir = TempDir::new().expect("tempdir");
5903 let start_realtime_usec = 1_700_000_000_000_000;
5904 write_stepped_netdata_test_journal(
5905 dir.path(),
5906 "paging-contract.journal",
5907 7,
5908 start_realtime_usec,
5909 1,
5910 );
5911
5912 let backward = collect_netdata_pages(dir.path(), Direction::Backward, 2);
5913 assert_eq!(
5914 backward.messages,
5915 vec![
5916 "row-6", "row-5", "row-4", "row-3", "row-2", "row-1", "row-0"
5917 ]
5918 );
5919 assert_unique_messages(&backward.messages);
5920 assert_eq!(
5921 backward.timestamps,
5922 vec![
5923 start_realtime_usec + 6,
5924 start_realtime_usec + 5,
5925 start_realtime_usec + 4,
5926 start_realtime_usec + 3,
5927 start_realtime_usec + 2,
5928 start_realtime_usec + 1,
5929 start_realtime_usec,
5930 ]
5931 );
5932
5933 let forward = collect_netdata_pages(dir.path(), Direction::Forward, 2);
5934 assert_eq!(
5935 forward.messages,
5936 vec![
5937 "row-1", "row-0", "row-3", "row-2", "row-5", "row-4", "row-6"
5938 ]
5939 );
5940 assert_unique_messages(&forward.messages);
5941 assert_eq!(
5942 forward.timestamps,
5943 vec![
5944 start_realtime_usec + 1,
5945 start_realtime_usec,
5946 start_realtime_usec + 3,
5947 start_realtime_usec + 2,
5948 start_realtime_usec + 5,
5949 start_realtime_usec + 4,
5950 start_realtime_usec + 6,
5951 ]
5952 );
5953 }
5954
5955 #[test]
5956 fn netdata_function_tail_polls_return_only_rows_after_anchor_then_304() {
5957 let dir = TempDir::new().expect("tempdir");
5958 let start_realtime_usec = 1_700_000_000_000_000;
5959 write_stepped_netdata_test_journal(
5960 dir.path(),
5961 "tail-contract.journal",
5962 5,
5963 start_realtime_usec,
5964 1,
5965 );
5966 let anchor = start_realtime_usec + 2;
5967
5968 for requested_direction in [Direction::Backward, Direction::Forward] {
5969 let response = run_netdata_contract_request(
5970 dir.path(),
5971 json!({
5972 "after": 1_700_000_000,
5973 "before": 1_700_000_010,
5974 "last": 5,
5975 "direction": direction_name(requested_direction),
5976 "data_only": true,
5977 "tail": true,
5978 "if_modified_since": anchor,
5979 "anchor": anchor,
5980 }),
5981 );
5982 assert_eq!(response["status"], 200);
5983 assert_eq!(response["_request"]["direction"], "backward");
5984 assert_eq!(
5985 response_column_strings(&response, "MESSAGE"),
5986 vec!["row-4", "row-3"]
5987 );
5988 assert_eq!(
5989 response_column_u64s(&response, "timestamp"),
5990 vec![start_realtime_usec + 4, start_realtime_usec + 3]
5991 );
5992 }
5993
5994 let no_change = run_netdata_contract_request(
5995 dir.path(),
5996 json!({
5997 "after": 1_700_000_000,
5998 "before": 1_700_000_010,
5999 "last": 5,
6000 "direction": "backward",
6001 "data_only": true,
6002 "tail": true,
6003 "if_modified_since": start_realtime_usec + 4,
6004 "anchor": start_realtime_usec + 4,
6005 }),
6006 );
6007 assert_eq!(no_change["status"], 304);
6008 assert_eq!(
6009 no_change["errorMessage"],
6010 "No new data since the previous call."
6011 );
6012 }
6013
6014 #[test]
6015 fn netdata_function_tail_delta_reports_exact_incremental_facets_and_histogram() {
6016 let dir = TempDir::new().expect("tempdir");
6017 let start_realtime_usec = 1_700_000_000_000_000;
6018 write_stepped_netdata_test_journal(
6019 dir.path(),
6020 "delta-contract.journal",
6021 5,
6022 start_realtime_usec,
6023 1,
6024 );
6025 let anchor = start_realtime_usec + 1;
6026
6027 let response = run_netdata_contract_request(
6028 dir.path(),
6029 json!({
6030 "after": 1_700_000_000,
6031 "before": 1_700_000_010,
6032 "last": 2,
6033 "direction": "backward",
6034 "data_only": true,
6035 "delta": true,
6036 "tail": true,
6037 "if_modified_since": anchor,
6038 "anchor": anchor,
6039 "facets": ["SERVICE"],
6040 "histogram": "SERVICE",
6041 }),
6042 );
6043
6044 assert_eq!(response["status"], 200);
6045 assert_eq!(
6046 response_column_strings(&response, "MESSAGE"),
6047 vec!["row-4", "row-3"]
6048 );
6049 assert_eq!(
6050 response_facet_count(&response, "facets_delta", "SERVICE", "even"),
6051 2
6052 );
6053 assert_eq!(
6054 response_facet_count(&response, "facets_delta", "SERVICE", "odd"),
6055 1
6056 );
6057 assert_eq!(
6058 response_histogram_total(&response, "histogram_delta", "even"),
6059 2
6060 );
6061 assert_eq!(
6062 response_histogram_total(&response, "histogram_delta", "odd"),
6063 1
6064 );
6065 let items = response["items_delta"].as_object().expect("items_delta");
6066 assert_eq!(items["matched"], 3);
6067 assert_eq!(items["returned"], 2);
6068 assert_eq!(items["after"], 2);
6069 }
6070
6071 #[test]
6072 fn realtime_adjuster_preserves_forward_state_across_file_boundaries() {
6073 let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Forward);
6074
6075 assert_eq!(adjuster.adjust(10), 10);
6076 assert_eq!(adjuster.adjust(10), 11);
6077 assert_eq!(adjuster.adjust(10), 12);
6078 }
6079
6080 #[test]
6081 fn realtime_adjuster_preserves_backward_state_across_file_boundaries() {
6082 let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Backward);
6083
6084 assert_eq!(adjuster.adjust(10), 10);
6085 assert_eq!(adjuster.adjust(10), 9);
6086 assert_eq!(adjuster.adjust(10), 8);
6087 }
6088
6089 #[test]
6090 fn systemd_profile_keeps_user_group_ids_raw_by_default() {
6091 let context = DisplayContext::default();
6092 let profile = SystemdJournalProfile;
6093 assert_eq!(
6094 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6095 json!("0")
6096 );
6097 assert_eq!(
6098 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6099 json!("0")
6100 );
6101 }
6102
6103 #[cfg(unix)]
6104 #[test]
6105 fn plugin_compatible_profile_resolves_user_group_ids_explicitly() {
6106 let context = DisplayContext::default();
6107 let profile = SystemdJournalPluginProfile;
6108 assert_eq!(
6109 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6110 json!("root")
6111 );
6112 assert_eq!(
6113 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6114 json!("root")
6115 );
6116 }
6117
6118 #[cfg(unix)]
6119 #[test]
6120 fn plugin_compatible_profile_caches_user_group_resolution() {
6121 let context = DisplayContext::default();
6122 let profile = SystemdJournalPluginProfile;
6123
6124 assert_eq!(
6125 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6126 json!("root")
6127 );
6128 assert_eq!(
6129 profile.field_display_value(&context, DisplayScope::Data, "_UID", b"0"),
6130 json!("root")
6131 );
6132 assert_eq!(
6133 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6134 json!("root")
6135 );
6136 assert_eq!(
6137 profile.field_display_value(&context, DisplayScope::Data, "_GID", b"0"),
6138 json!("root")
6139 );
6140
6141 assert_eq!(context.uid_display_cache.borrow().len(), 1);
6142 assert_eq!(context.gid_display_cache.borrow().len(), 1);
6143 }
6144
6145 #[test]
6146 fn file_overlap_uses_netdata_max_realtime_slack() {
6147 let file_first_seconds = 200_000_000u64;
6148 let file_last_seconds = 200_000_100u64;
6149 let slack_seconds = NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC / 1_000_000;
6150 let header = crate::FileHeader {
6151 signature: *b"LPKSHHRH",
6152 compatible_flags: 0,
6153 incompatible_flags: 0,
6154 state: 0,
6155 header_size: 0,
6156 n_entries: 0,
6157 head_entry_realtime: file_first_seconds * 1_000_000,
6158 tail_entry_realtime: file_last_seconds * 1_000_000,
6159 head_entry_seqnum: 0,
6160 tail_entry_seqnum: 0,
6161 tail_entry_boot_id: [0; 16],
6162 seqnum_id: [0; 16],
6163 };
6164 let config = NetdataFunctionConfig::systemd_journal();
6165
6166 let inside_slack = NetdataRequest::parse(
6167 &json!({
6168 "after": file_last_seconds + slack_seconds - 1,
6169 "before": file_last_seconds + slack_seconds + 500
6170 }),
6171 &config,
6172 )
6173 .expect("parse request");
6174 assert!(file_may_overlap_request(header, &inside_slack));
6175
6176 let outside_slack = NetdataRequest::parse(
6177 &json!({
6178 "after": file_last_seconds + slack_seconds + 1,
6179 "before": file_last_seconds + slack_seconds + 500
6180 }),
6181 &config,
6182 )
6183 .expect("parse request");
6184 assert!(!file_may_overlap_request(header, &outside_slack));
6185 }
6186
6187 #[test]
6188 fn journal_file_order_matches_plugin_comparator_shape() {
6189 let older = JournalFileOrderInfo {
6190 msg_first_realtime_usec: 100,
6191 msg_last_realtime_usec: 200,
6192 file_last_modified_usec: 200,
6193 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6194 };
6195 let newer = JournalFileOrderInfo {
6196 msg_first_realtime_usec: 100,
6197 msg_last_realtime_usec: 300,
6198 file_last_modified_usec: 100,
6199 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6200 };
6201 assert_eq!(
6202 compare_journal_file_order(&newer, &older, Direction::Backward),
6203 Ordering::Less
6204 );
6205 assert_eq!(
6206 compare_journal_file_order(&newer, &older, Direction::Forward),
6207 Ordering::Greater
6208 );
6209
6210 let newer_mtime = JournalFileOrderInfo {
6211 msg_first_realtime_usec: 100,
6212 msg_last_realtime_usec: 200,
6213 file_last_modified_usec: 300,
6214 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6215 };
6216 assert_eq!(
6217 compare_journal_file_order(&newer_mtime, &older, Direction::Backward),
6218 Ordering::Less
6219 );
6220
6221 let newer_first = JournalFileOrderInfo {
6222 msg_first_realtime_usec: 150,
6223 msg_last_realtime_usec: 200,
6224 file_last_modified_usec: 200,
6225 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6226 };
6227 assert_eq!(
6228 compare_journal_file_order(&newer_first, &older, Direction::Backward),
6229 Ordering::Less
6230 );
6231 }
6232
6233 #[test]
6234 fn boot_first_realtime_keeps_earliest_timestamp_like_plugin() {
6235 let mut boot_first = BTreeMap::new();
6236 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 300);
6237 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 100);
6238 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 200);
6239
6240 assert_eq!(boot_first.get(b"boot-a".as_slice()), Some(&100));
6241 }
6242
6243 #[test]
6244 fn merge_histogram_rejects_inconsistent_bucket_shape() {
6245 let mut target = Some(ExplorerHistogram {
6246 field: b"PRIORITY".to_vec(),
6247 buckets: vec![ExplorerHistogramBucket {
6248 start_realtime_usec: 1_000_000,
6249 end_realtime_usec: 2_000_000,
6250 values: HashMap::new(),
6251 }],
6252 });
6253 let source = ExplorerHistogram {
6254 field: b"PRIORITY".to_vec(),
6255 buckets: vec![ExplorerHistogramBucket {
6256 start_realtime_usec: 1_000_000,
6257 end_realtime_usec: 1_500_000,
6258 values: HashMap::new(),
6259 }],
6260 };
6261
6262 let err = merge_histogram(&mut target, source).expect_err("shape mismatch");
6263 assert!(matches!(
6264 err,
6265 SdkError::Unsupported("inconsistent Netdata histogram bucket shape")
6266 ));
6267 }
6268
6269 #[test]
6270 fn collect_journal_files_recurses_nested_directories() {
6271 let dir = TempDir::new().expect("tempdir");
6272 let nested = dir.path().join("machine").join("nested");
6273 write_named_netdata_test_journal(&nested, "system.journal", 1, 1_700_000_000_000_000);
6274
6275 let collection = collect_journal_files(dir.path()).expect("collect files");
6276
6277 assert_eq!(collection.files.len(), 1);
6278 assert_eq!(collection.skipped, 0);
6279 assert!(collection.errors.is_empty());
6280 assert_eq!(
6281 collection.files[0]
6282 .file_name()
6283 .and_then(|name| name.to_str()),
6284 Some("system.journal")
6285 );
6286 }
6287
6288 #[cfg(unix)]
6289 #[test]
6290 fn collect_journal_files_deduplicates_symlinked_directories() {
6291 let dir = TempDir::new().expect("tempdir");
6292 let real = dir.path().join("real");
6293 let link = dir.path().join("link");
6294 write_named_netdata_test_journal(&real, "system.journal", 1, 1_700_000_000_000_000);
6295 std::os::unix::fs::symlink(&real, &link).expect("symlink");
6296
6297 let collection = collect_journal_files(dir.path()).expect("collect files");
6298
6299 assert_eq!(collection.files.len(), 1);
6300 assert_eq!(collection.skipped, 0);
6301 assert!(collection.errors.is_empty());
6302 }
6303
6304 #[cfg(unix)]
6305 #[test]
6306 fn collect_journal_files_deduplicates_symlinked_files() {
6307 let dir = TempDir::new().expect("tempdir");
6308 write_named_netdata_test_journal(dir.path(), "system.journal", 1, 1_700_000_000_000_000);
6309 std::os::unix::fs::symlink(
6310 dir.path().join("system.journal"),
6311 dir.path().join("system-copy.journal"),
6312 )
6313 .expect("symlink");
6314
6315 let collection = collect_journal_files(dir.path()).expect("collect files");
6316
6317 assert_eq!(collection.files.len(), 1);
6318 assert_eq!(collection.skipped, 0);
6319 assert!(collection.errors.is_empty());
6320 }
6321
6322 #[cfg(unix)]
6323 #[test]
6324 fn collect_journal_files_reports_unreadable_subdirectories() {
6325 use std::os::unix::fs::PermissionsExt;
6326
6327 if unsafe { libc::geteuid() } == 0 {
6328 return;
6329 }
6330
6331 let dir = TempDir::new().expect("tempdir");
6332 std::fs::write(dir.path().join("visible.journal"), b"").expect("journal");
6333 let locked = dir.path().join("locked");
6334 std::fs::create_dir(&locked).expect("locked dir");
6335 std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o000))
6336 .expect("lock dir");
6337
6338 let collection = collect_journal_files(dir.path()).expect("collect files");
6339
6340 std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o700))
6341 .expect("unlock dir");
6342 assert_eq!(collection.files.len(), 1);
6343 assert_eq!(collection.skipped, 1);
6344 assert_eq!(collection.errors.len(), 1);
6345 assert!(collection.errors[0].contains("locked"));
6346 }
6347
6348 #[test]
6349 fn source_summary_fills_missing_caller_metadata_from_header() {
6350 let dir = TempDir::new().expect("tempdir");
6351 write_named_netdata_test_journal(dir.path(), "system.journal", 2, 1_700_000_000_000_000);
6352 let path = dir.path().join("system.journal");
6353 let mut state = TestNetdataState::default();
6354 state.metadata.insert(
6355 path.clone(),
6356 NetdataJournalFileMetadata {
6357 msg_first_realtime_usec: Some(1_699_999_999_000_000),
6358 ..NetdataJournalFileMetadata::default()
6359 },
6360 );
6361 let options = NetdataFunctionRunOptions {
6362 state: Some(&mut state),
6363 ..NetdataFunctionRunOptions::default()
6364 };
6365
6366 let summary = JournalSourceSummary::from_paths(&[path], ReaderOptions::default(), &options);
6367
6368 assert_eq!(summary.first_realtime_usec, Some(1_699_999_999_000_000));
6369 assert_eq!(summary.last_realtime_usec, Some(1_700_000_000_000_001));
6370 }
6371
6372 #[test]
6373 fn source_summary_coverage_is_off_below_one_second() {
6374 let first = 1_700_000_000_000_000;
6375 let mut summary = JournalSourceSummary {
6376 files: 1,
6377 total_size: 0,
6378 first_realtime_usec: Some(first),
6379 last_realtime_usec: Some(first + 999_999),
6380 };
6381
6382 assert!(summary.info().contains("covering off"));
6383
6384 summary.last_realtime_usec = Some(first + 1_000_000);
6385 assert!(summary.info().contains("covering 1s"));
6386 }
6387
6388 #[test]
6389 fn source_selection_echoes_and_filters_known_groups() {
6390 let config = NetdataFunctionConfig::systemd_journal();
6391 let request = NetdataRequest::parse(
6392 &json!({
6393 "selections": {
6394 "__logs_sources": ["all-local-system-logs"]
6395 }
6396 }),
6397 &config,
6398 )
6399 .expect("parse source-filtered request");
6400
6401 assert_eq!(request.source_type, SOURCE_TYPE_LOCAL_SYSTEM);
6402 assert_eq!(
6403 request.echo.get("source_type").and_then(Value::as_u64),
6404 Some(SOURCE_TYPE_LOCAL_SYSTEM)
6405 );
6406 assert!(
6407 request
6408 .echo
6409 .pointer("/selections/__logs_sources/0")
6410 .is_some_and(Value::is_null)
6411 );
6412 assert!(request.matches_source(Path::new("/var/log/journal/machine/system.journal"), None));
6413 assert!(!request.matches_source(
6414 Path::new("/var/log/journal/machine/user-1000.journal"),
6415 None
6416 ));
6417 }
6418
6419 #[test]
6420 fn source_selection_uses_caller_metadata_before_filename_fallback() {
6421 let dir = TempDir::new().expect("tempdir");
6422 write_named_netdata_test_journal(dir.path(), "user-1000.journal", 1, 1_700_000_000_000_000);
6423 let path = dir.path().join("user-1000.journal");
6424 let config = NetdataFunctionConfig::systemd_journal();
6425 let request = NetdataRequest::parse(
6426 &json!({
6427 "after": 1_700_000_000,
6428 "before": 1_700_000_001,
6429 "selections": {
6430 "__logs_sources": ["all-local-system-logs"]
6431 }
6432 }),
6433 &config,
6434 )
6435 .expect("parse source-filtered request");
6436 assert!(!request.matches_source(&path, None));
6437
6438 let mut state = TestNetdataState::default();
6439 state.metadata.insert(
6440 path.clone(),
6441 NetdataJournalFileMetadata {
6442 source_type: Some(NETDATA_SOURCE_TYPE_LOCAL_SYSTEM),
6443 source_name: Some("system-registry".to_string()),
6444 ..NetdataJournalFileMetadata::default()
6445 },
6446 );
6447 let options = NetdataFunctionRunOptions {
6448 state: Some(&mut state),
6449 ..NetdataFunctionRunOptions::default()
6450 };
6451 let selected =
6452 select_journal_files_for_request(vec![path], &request, config.reader_options, &options);
6453
6454 assert_eq!(selected.files.len(), 1);
6455 }
6456
6457 #[test]
6458 fn netdata_function_state_receives_learned_source_realtime_delta() {
6459 let dir = TempDir::new().expect("tempdir");
6460 let commit_realtime_usec = 1_700_000_030_000_000;
6461 let source_realtime_usec = commit_realtime_usec - 30_000_000;
6462 write_source_realtime_delta_journal(
6463 dir.path(),
6464 "system.journal",
6465 commit_realtime_usec,
6466 source_realtime_usec,
6467 );
6468 let request = json!({
6469 "after": 1_700_000_029,
6470 "before": 1_700_000_031,
6471 "facets": ["SERVICE"],
6472 "histogram": "SERVICE",
6473 "last": 1,
6474 "sampling": 0
6475 });
6476 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
6477 let mut state = TestNetdataState::default();
6478 let options = NetdataFunctionRunOptions {
6479 state: Some(&mut state),
6480 ..NetdataFunctionRunOptions::from_timeout_seconds(0)
6481 };
6482
6483 let response = function
6484 .run_directory_request_json_with_options(dir.path(), &request, options)
6485 .expect("run function");
6486
6487 assert_eq!(response["status"], 200);
6488 assert_eq!(state.updates.len(), 1);
6489 assert_eq!(state.updates[0].1, 30_000_000);
6490 }
6491
6492 #[test]
6493 fn source_classification_matches_plugin_filename_shape() {
6494 assert_eq!(
6495 journal_file_source_type(Path::new("/var/log/journal/machine/system.journal")),
6496 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM
6497 );
6498 assert_eq!(
6499 journal_file_source_type(Path::new("/var/log/journal/machine/user-1000.journal")),
6500 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER
6501 );
6502 assert_eq!(
6503 journal_file_source_type(Path::new("/var/log/journal/machine/other.journal")),
6504 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
6505 );
6506 assert_eq!(
6507 journal_file_source_type(Path::new(
6508 "/var/log/journal/machine.namespace/system.journal"
6509 )),
6510 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE
6511 );
6512 assert_eq!(
6513 journal_file_source_type(Path::new(
6514 "/var/log/journal/remote/remote-host-a@machine.journal"
6515 )),
6516 SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL
6517 );
6518 }
6519
6520 #[test]
6521 fn exact_source_names_follow_plugin_prefixes() {
6522 assert_eq!(
6523 journal_file_exact_source_name(Path::new(
6524 "/var/log/journal/machine.namespace/system.journal"
6525 ))
6526 .as_deref(),
6527 Some("namespace-namespace")
6528 );
6529 assert_eq!(
6530 journal_file_exact_source_name(Path::new(
6531 "/var/log/journal/remote/remote-host-a@machine.journal"
6532 ))
6533 .as_deref(),
6534 Some("remote-host-a")
6535 );
6536 assert_eq!(
6537 journal_file_exact_source_name(Path::new(
6538 "/var/log/journal/remote/remote-host-b.journal~.zst"
6539 ))
6540 .as_deref(),
6541 Some("remote-host-b")
6542 );
6543 }
6544
6545 #[test]
6546 fn disposed_journal_extension_matches_plugin_scan_contract() {
6547 assert!(is_journal_file_name(Path::new("active.journal")));
6548 assert!(is_journal_file_name(Path::new("archived.journal~")));
6549 assert!(is_journal_file_name(Path::new("active.journal.zst")));
6550 assert!(is_journal_file_name(Path::new("archived.journal~.zst")));
6551 }
6552
6553 fn test_located_row(realtime_usec: u64) -> LocatedRow {
6554 LocatedRow {
6555 file_path: PathBuf::from("test.journal"),
6556 row: ExplorerRow {
6557 realtime_usec,
6558 cursor: String::new(),
6559 payloads: Vec::new(),
6560 },
6561 }
6562 }
6563
6564 fn write_source_realtime_delta_journal(
6565 directory: &std::path::Path,
6566 name: &str,
6567 commit_realtime_usec: u64,
6568 source_realtime_usec: u64,
6569 ) {
6570 std::fs::create_dir_all(directory).expect("create journal dir");
6571 let path = directory.join(name);
6572 let repo_file = RepoFile::from_path(&path).expect("repo file");
6573 let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
6574 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
6575 let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
6576 let source = format!("_SOURCE_REALTIME_TIMESTAMP={source_realtime_usec}");
6577 let payloads: [&[u8]; 4] = [
6578 b"MESSAGE=source lag test".as_slice(),
6579 b"SERVICE=delta".as_slice(),
6580 b"PRIORITY=6".as_slice(),
6581 source.as_bytes(),
6582 ];
6583 writer
6584 .add_entry(
6585 &mut file,
6586 &payloads,
6587 commit_realtime_usec,
6588 commit_realtime_usec,
6589 )
6590 .expect("write entry");
6591 file.sync().expect("sync journal");
6592 }
6593}