1use crate::explorer::{ExplorerSamplingState, histogram_bucket_count_for_query};
2use crate::{
3 Direction, ExplorerAnchor, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
4 ExplorerFtsPattern, ExplorerHistogram, ExplorerProgress, ExplorerQuery, ExplorerResult,
5 ExplorerRow, ExplorerSampling, ExplorerStats, ExplorerStopReason, ExplorerStrategy, FileHeader,
6 FileReader, ReaderOptions, Result, SdkError,
7};
8use chrono::{DateTime, Utc};
9use serde_json::{Map, Value, json};
10use std::cell::RefCell;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet, VecDeque};
13#[cfg(unix)]
14use std::ffi::CStr;
15use std::path::{Path, PathBuf};
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18const DEFAULT_FUNCTION_NAME: &str = "systemd-journal";
19const DEFAULT_ITEMS_TO_RETURN: usize = 200;
20const DEFAULT_TIME_WINDOW_SECONDS: i64 = 3600;
21const DEFAULT_ITEMS_SAMPLING: u64 = 1_000_000;
22const DATA_ONLY_CHECK_EVERY_ROWS: u64 = 128;
23const API_RELATIVE_TIME_MAX_SECONDS: i64 = 3 * 365 * 86_400;
24const NETDATA_MISSING_AFTER_RELATIVE_SECONDS: i64 = 600;
25const DEFAULT_HISTOGRAM_BUCKETS: usize = 150;
26const EFFECTIVELY_DISABLED_TIMEOUT_SECONDS: u64 = 100 * 365 * 24 * 60 * 60;
27const NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC: u64 = 5_000_000;
28const NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC: u64 = 2 * 60 * 1_000_000;
29const NETDATA_EMPTY_STRING_FACET_HASH_ID: &str = "CzGfAU2z3TC";
30const NETDATA_UNAVAILABLE_FIELD_LABEL: &str = "[unavailable field]";
31const NETDATA_FACET_MAX_VALUE_LENGTH: usize = 8192;
32const NETDATA_MAX_DIRECTORY_SCAN_DEPTH: usize = 64;
33const NETDATA_MAX_DIRECTORY_SCAN_COUNT: usize = 8192;
34const SOURCE_TYPE_ALL: u64 = 1 << 0;
35const SOURCE_TYPE_LOCAL_ALL: u64 = 1 << 1;
36const SOURCE_TYPE_REMOTE_ALL: u64 = 1 << 2;
37const SOURCE_TYPE_LOCAL_SYSTEM: u64 = 1 << 3;
38const SOURCE_TYPE_LOCAL_USER: u64 = 1 << 4;
39const SOURCE_TYPE_LOCAL_NAMESPACE: u64 = 1 << 5;
40const SOURCE_TYPE_LOCAL_OTHER: u64 = 1 << 6;
41
42pub const NETDATA_SOURCE_TYPE_ALL: u64 = SOURCE_TYPE_ALL;
43pub const NETDATA_SOURCE_TYPE_LOCAL_ALL: u64 = SOURCE_TYPE_LOCAL_ALL;
44pub const NETDATA_SOURCE_TYPE_REMOTE_ALL: u64 = SOURCE_TYPE_REMOTE_ALL;
45pub const NETDATA_SOURCE_TYPE_LOCAL_SYSTEM: u64 = SOURCE_TYPE_LOCAL_SYSTEM;
46pub const NETDATA_SOURCE_TYPE_LOCAL_USER: u64 = SOURCE_TYPE_LOCAL_USER;
47pub const NETDATA_SOURCE_TYPE_LOCAL_NAMESPACE: u64 = SOURCE_TYPE_LOCAL_NAMESPACE;
48pub const NETDATA_SOURCE_TYPE_LOCAL_OTHER: u64 = SOURCE_TYPE_LOCAL_OTHER;
49
50const NETDATA_ACCEPTED_PARAMS: &[&str] = &[
51 "info",
52 "__logs_sources",
53 "after",
54 "before",
55 "anchor",
56 "direction",
57 "last",
58 "query",
59 "facets",
60 "histogram",
61 "if_modified_since",
62 "data_only",
63 "delta",
64 "tail",
65 "sampling",
66 "slice",
67];
68
69const SYSTEMD_DEFAULT_VIEW_KEYS: &[&str] = &[
70 "_HOSTNAME",
71 "ND_JOURNAL_PROCESS",
72 "MESSAGE",
73 "PRIORITY",
74 "SYSLOG_FACILITY",
75 "ERRNO",
76 "ND_JOURNAL_FILE",
77 "SYSLOG_IDENTIFIER",
78 "UNIT",
79 "USER_UNIT",
80 "MESSAGE_ID",
81 "_BOOT_ID",
82 "_SYSTEMD_OWNER_UID",
83 "_UID",
84 "OBJECT_SYSTEMD_OWNER_UID",
85 "OBJECT_UID",
86 "_GID",
87 "OBJECT_GID",
88 "_CAP_EFFECTIVE",
89 "_AUDIT_LOGINUID",
90 "OBJECT_AUDIT_LOGINUID",
91 "_SOURCE_REALTIME_TIMESTAMP",
92];
93
94const SYSTEMD_DEFAULT_FACETS: &[&str] = &[
95 "_HOSTNAME",
96 "PRIORITY",
97 "SYSLOG_FACILITY",
98 "ERRNO",
99 "SYSLOG_IDENTIFIER",
100 "UNIT",
101 "USER_UNIT",
102 "MESSAGE_ID",
103 "_BOOT_ID",
104 "_SYSTEMD_OWNER_UID",
105 "_UID",
106 "OBJECT_SYSTEMD_OWNER_UID",
107 "OBJECT_UID",
108 "_GID",
109 "OBJECT_GID",
110 "_AUDIT_LOGINUID",
111 "OBJECT_AUDIT_LOGINUID",
112 "CODE_FILE",
113 "_SYSTEMD_UNIT",
114 "_SYSTEMD_USER_SLICE",
115 "CODE_FUNC",
116 "_TRANSPORT",
117 "_COMM",
118 "_RUNTIME_SCOPE",
119 "_MACHINE_ID",
120 "_SYSTEMD_SLICE",
121 "UNIT_RESULT",
122 "_SYSTEMD_CGROUP",
123 "_EXE",
124 "_SYSTEMD_USER_UNIT",
125 "_SYSTEMD_SESSION",
126 "COREDUMP_CGROUP",
127 "COREDUMP_USER_UNIT",
128 "COREDUMP_UNIT",
129 "COREDUMP_SIGNAL_NAME",
130 "COREDUMP_COMM",
131 "_UDEV_DEVNODE",
132 "_KERNEL_SUBSYSTEM",
133 "OBJECT_EXE",
134 "OBJECT_SYSTEMD_CGROUP",
135 "OBJECT_COMM",
136 "OBJECT_SYSTEMD_UNIT",
137 "OBJECT_SYSTEMD_USER_UNIT",
138 "_SELINUX_CONTEXT",
139 "_NAMESPACE",
140 "OBJECT_SYSTEMD_SESSION",
141 "CONTAINER_ID",
142 "CONTAINER_NAME",
143 "CONTAINER_TAG",
144 "IMAGE_NAME",
145 "ND_NIDL_NODE",
146 "ND_NIDL_CONTEXT",
147 "ND_LOG_SOURCE",
148 "ND_ALERT_NAME",
149 "ND_ALERT_CLASS",
150 "ND_ALERT_COMPONENT",
151 "ND_ALERT_TYPE",
152 "ND_ALERT_STATUS",
153];
154
155#[derive(Debug, Clone)]
156pub struct NetdataFunctionConfig {
157 pub function_name: String,
158 pub default_facets: Vec<String>,
159 pub default_view_keys: Vec<String>,
160 pub default_histogram: Option<String>,
161 pub reader_options: ReaderOptions,
162 pub explorer_strategy: ExplorerStrategy,
163}
164
165impl NetdataFunctionConfig {
166 pub fn systemd_journal() -> Self {
167 Self {
168 function_name: DEFAULT_FUNCTION_NAME.to_string(),
169 default_facets: SYSTEMD_DEFAULT_FACETS
170 .iter()
171 .map(|field| (*field).to_string())
172 .collect(),
173 default_view_keys: SYSTEMD_DEFAULT_VIEW_KEYS
174 .iter()
175 .map(|field| (*field).to_string())
176 .collect(),
177 default_histogram: Some("PRIORITY".to_string()),
178 reader_options: ReaderOptions::snapshot(),
179 explorer_strategy: ExplorerStrategy::Traversal,
180 }
181 }
182}
183
184impl Default for NetdataFunctionConfig {
185 fn default() -> Self {
186 Self::systemd_journal()
187 }
188}
189
190#[derive(Debug, Default)]
191pub struct DisplayContext {
192 boot_first_realtime: BTreeMap<Vec<u8>, u64>,
193 uid_display_cache: RefCell<BTreeMap<String, String>>,
194 gid_display_cache: RefCell<BTreeMap<String, String>>,
195}
196
197#[derive(Debug, Clone, Copy)]
198pub enum DisplayScope {
199 Data,
200 Facet,
201 Histogram,
202}
203
204pub trait NetdataFunctionProfile {
205 fn field_display_value(
206 &self,
207 _context: &DisplayContext,
208 _scope: DisplayScope,
209 _field: &str,
210 value: &[u8],
211 ) -> Value {
212 Value::String(String::from_utf8_lossy(value).into_owned())
213 }
214
215 fn facet_option_name(&self, context: &DisplayContext, field: &str, raw_value: &[u8]) -> String {
216 match self.field_display_value(context, DisplayScope::Facet, field, raw_value) {
217 Value::String(value) => value,
218 other => other.to_string(),
219 }
220 }
221
222 fn row_options(&self, fields: &BTreeMap<String, Vec<Vec<u8>>>) -> Value {
223 if let Some(priority) = first_value(fields, "PRIORITY") {
224 return json!({ "severity": priority_to_row_severity(priority) });
225 }
226 json!({ "severity": "normal" })
227 }
228}
229
230#[derive(Debug, Clone, Copy, Default)]
231pub struct SystemdJournalProfile;
232
233#[derive(Debug, Clone, Copy, Default)]
234pub struct SystemdJournalPluginProfile;
235
236impl NetdataFunctionProfile for SystemdJournalProfile {
237 fn field_display_value(
238 &self,
239 context: &DisplayContext,
240 scope: DisplayScope,
241 field: &str,
242 value: &[u8],
243 ) -> Value {
244 systemd_field_display_value(context, scope, field, value, false)
245 }
246}
247
248impl NetdataFunctionProfile for SystemdJournalPluginProfile {
249 fn field_display_value(
250 &self,
251 context: &DisplayContext,
252 scope: DisplayScope,
253 field: &str,
254 value: &[u8],
255 ) -> Value {
256 systemd_field_display_value(context, scope, field, value, true)
257 }
258}
259
260#[derive(Debug, Clone)]
261pub struct NetdataJournalFunction<P = SystemdJournalProfile> {
262 config: NetdataFunctionConfig,
263 profile: P,
264}
265
266#[derive(Debug, Clone)]
267pub struct NetdataFunctionProgress {
268 pub current_file: usize,
269 pub total_files: usize,
270 pub matched_files: u64,
271 pub skipped_files: u64,
272 pub stats: ExplorerStats,
273 pub elapsed: Duration,
274}
275
276#[derive(Debug, Clone, Default, PartialEq, Eq)]
277pub struct NetdataJournalFileMetadata {
278 pub source_type: Option<u64>,
279 pub source_name: Option<String>,
280 pub file_last_modified_usec: Option<u64>,
281 pub msg_first_realtime_usec: Option<u64>,
282 pub msg_last_realtime_usec: Option<u64>,
283 pub journal_vs_realtime_delta_usec: Option<u64>,
284}
285
286pub trait NetdataFunctionState {
287 fn file_metadata(&self, _path: &Path) -> Option<NetdataJournalFileMetadata> {
288 None
289 }
290
291 fn update_file_journal_vs_realtime_delta_usec(&mut self, _path: &Path, _delta_usec: u64) {}
292}
293
294pub struct NetdataFunctionRunOptions<'a> {
295 pub timeout: Option<Duration>,
296 pub progress_callback: Option<&'a mut dyn FnMut(NetdataFunctionProgress)>,
297 pub cancellation_callback: Option<&'a dyn Fn() -> bool>,
298 pub state: Option<&'a mut dyn NetdataFunctionState>,
299 pub progress_interval: Duration,
300}
301
302impl NetdataFunctionRunOptions<'_> {
303 pub fn from_timeout_seconds(seconds: u64) -> Self {
304 let seconds = if seconds == 0 {
305 EFFECTIVELY_DISABLED_TIMEOUT_SECONDS
306 } else {
307 seconds
308 };
309 Self {
310 timeout: Some(Duration::from_secs(seconds)),
311 progress_callback: None,
312 cancellation_callback: None,
313 state: None,
314 progress_interval: Duration::from_millis(250),
315 }
316 }
317}
318
319impl Default for NetdataFunctionRunOptions<'_> {
320 fn default() -> Self {
321 Self {
322 timeout: Some(Duration::from_secs(EFFECTIVELY_DISABLED_TIMEOUT_SECONDS)),
323 progress_callback: None,
324 cancellation_callback: None,
325 state: None,
326 progress_interval: Duration::from_millis(250),
327 }
328 }
329}
330
331impl NetdataJournalFunction<SystemdJournalProfile> {
332 pub fn systemd_journal() -> Self {
333 Self {
334 config: NetdataFunctionConfig::systemd_journal(),
335 profile: SystemdJournalProfile,
336 }
337 }
338}
339
340impl NetdataJournalFunction<SystemdJournalPluginProfile> {
341 pub fn systemd_journal_plugin_compatible() -> Self {
342 Self {
343 config: NetdataFunctionConfig::systemd_journal(),
344 profile: SystemdJournalPluginProfile,
345 }
346 }
347}
348
349impl<P> NetdataJournalFunction<P>
350where
351 P: NetdataFunctionProfile,
352{
353 pub fn new(config: NetdataFunctionConfig, profile: P) -> Self {
354 Self { config, profile }
355 }
356
357 pub fn run_directory_request_json(&self, directory: &Path, request: &Value) -> Result<Value> {
358 self.run_directory_request_json_with_options(
359 directory,
360 request,
361 NetdataFunctionRunOptions::default(),
362 )
363 }
364
365 pub fn run_directory_request_json_with_options(
366 &self,
367 directory: &Path,
368 request: &Value,
369 mut options: NetdataFunctionRunOptions<'_>,
370 ) -> Result<Value> {
371 let request = NetdataRequest::parse(request, &self.config)?;
372 let collection = collect_journal_files(directory)?;
373 let paths = collection.files;
374 if request.info {
375 return Ok(self.info_response(request.echo, &paths, &options));
376 }
377 let annotation_paths = paths.clone();
378
379 let selected =
380 select_journal_files_for_request(paths, &request, self.config.reader_options, &options);
381 if let Some(response) = not_modified_before_scan_response(&request, &selected) {
382 return Ok(response);
383 }
384 let selected_files = selected.files;
385 let deadline = options.timeout.map(|timeout| Instant::now() + timeout);
386 let mut combined = self.explore_files(&selected_files, &request, deadline, &mut options)?;
387 self.finalize_combined_result(
388 &request,
389 &selected_files,
390 deadline,
391 &mut options,
392 &mut combined,
393 collection.skipped,
394 collection.errors,
395 )?;
396 Ok(self.query_response(request, &annotation_paths, combined))
397 }
398
399 fn finalize_combined_result(
400 &self,
401 request: &NetdataRequest,
402 selected_files: &[SelectedJournalFile],
403 deadline: Option<Instant>,
404 options: &mut NetdataFunctionRunOptions<'_>,
405 combined: &mut CombinedResult,
406 skipped_files: u64,
407 file_errors: Vec<String>,
408 ) -> Result<()> {
409 combined.skipped_files = combined.skipped_files.saturating_add(skipped_files);
410 combined.file_errors.extend(file_errors);
411 if combined.cancelled {
412 return Ok(());
413 }
414 if !request.data_only {
415 combined.add_zero_count_facet_values_from_files(
416 &request.facets,
417 self.config.reader_options,
418 );
419 combined.add_zero_count_selected_filter_values(request);
420 }
421 if should_collect_unfiltered_facet_vocabulary(request, combined) {
422 let vocabulary = self.explore_files(
423 selected_files,
424 &request.unfiltered_vocabulary(),
425 deadline,
426 options,
427 )?;
428 combined.add_zero_count_facet_values(&vocabulary.facets);
429 }
430 Ok(())
431 }
432
433 pub fn run_directory_request_bytes(&self, directory: &Path, request: &[u8]) -> Result<Value> {
434 self.run_directory_request_bytes_with_options(
435 directory,
436 request,
437 NetdataFunctionRunOptions::default(),
438 )
439 }
440
441 pub fn run_directory_request_bytes_with_options(
442 &self,
443 directory: &Path,
444 request: &[u8],
445 options: NetdataFunctionRunOptions<'_>,
446 ) -> Result<Value> {
447 let request: Value = serde_json::from_slice(request).map_err(|err| {
448 SdkError::InvalidPath(format!("invalid Netdata function JSON: {err}"))
449 })?;
450 self.run_directory_request_json_with_options(directory, &request, options)
451 }
452
453 fn explore_files(
454 &self,
455 files: &[SelectedJournalFile],
456 request: &NetdataRequest,
457 deadline: Option<Instant>,
458 options: &mut NetdataFunctionRunOptions<'_>,
459 ) -> Result<CombinedResult> {
460 let query = request.to_explorer_query(
461 files.len() as u64,
462 None,
463 NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
464 );
465 let mut combined = CombinedResult::default();
466 let page_window = RefCell::new(NetdataPageWindow::for_request(request));
467 combined.sampling_enabled = query.sampling.is_some();
468 let mut sampling_state =
469 ExplorerSamplingState::for_query(&query, histogram_bucket_count_for_query(&query));
470 let realtime_adjuster = RefCell::new(NetdataRealtimeAdjuster::new(request.direction));
471 let started = Instant::now();
472 let total_files = files.len();
473 for (file_index, file) in files.iter().enumerate() {
474 let path = &file.path;
475 if should_stop_before_file(&mut combined, deadline, options) {
476 break;
477 }
478 let Some(mut reader) = self.open_file_for_explore(
479 path,
480 &mut combined,
481 options,
482 progress_context(file_index, total_files, started),
483 )?
484 else {
485 continue;
486 };
487 combined.matched_files = combined.matched_files.saturating_add(1);
488 combined.matched_paths.push(path.clone());
489 let query = request.file_query(files.len(), reader.header(), &file.order);
490 collect_column_fields_for_file(&mut reader, request, path, &mut combined);
491 let explored = self.explore_single_file(
492 &mut reader,
493 request,
494 &query,
495 deadline,
496 options,
497 &combined,
498 &page_window,
499 sampling_state.as_mut(),
500 &realtime_adjuster,
501 progress_context(file_index, total_files, started),
502 file.order.journal_vs_realtime_delta_usec,
503 );
504 let Some((result, stop_reason)) = record_explore_result(explored, path, &mut combined)
505 else {
506 continue;
507 };
508 if finish_explored_file(
509 options,
510 request,
511 file,
512 &query,
513 result,
514 stop_reason,
515 &mut combined,
516 files,
517 file_index,
518 progress_context(file_index, total_files, started),
519 )? {
520 break;
521 }
522 }
523 combined.expand_row_payloads(self.config.reader_options);
524 combined.page_counters = Some(page_window.into_inner().counters());
525 Ok(combined)
526 }
527
528 fn open_file_for_explore(
529 &self,
530 path: &Path,
531 combined: &mut CombinedResult,
532 options: &mut NetdataFunctionRunOptions<'_>,
533 progress: ProgressContext,
534 ) -> Result<Option<FileReader>> {
535 match FileReader::open_with_options(path, self.config.reader_options) {
536 Ok(reader) => Ok(Some(reader)),
537 Err(err) => {
538 combined.skipped_files = combined.skipped_files.saturating_add(1);
539 combined
540 .file_errors
541 .push(format!("{}: {err}", path.display()));
542 emit_progress_for_combined(options, combined, progress);
543 Ok(None)
544 }
545 }
546 }
547
548 #[allow(clippy::too_many_arguments)]
549 fn explore_single_file(
550 &self,
551 reader: &mut FileReader,
552 request: &NetdataRequest,
553 query: &ExplorerQuery,
554 deadline: Option<Instant>,
555 options: &mut NetdataFunctionRunOptions<'_>,
556 combined: &CombinedResult,
557 page_window: &RefCell<NetdataPageWindow>,
558 sampling_state: Option<&mut ExplorerSamplingState>,
559 realtime_adjuster: &RefCell<NetdataRealtimeAdjuster>,
560 progress: ProgressContext,
561 realtime_delta_usec: u64,
562 ) -> Result<(ExplorerResult, Option<ExplorerStopReason>)> {
563 let cancellation_callback = options.cancellation_callback;
564 let progress_interval = options.progress_interval;
565 let mut explorer_progress = |explorer_progress: ExplorerProgress| {
566 emit_explorer_progress(options, combined, explorer_progress, progress);
567 };
568 let mut control = ExplorerControl::new();
569 control.set_deadline(deadline);
570 control.set_cancellation_callback(cancellation_callback);
571 control.set_progress_interval(progress_interval);
572 control.set_progress_callback(Some(&mut explorer_progress));
573 control.set_sampling_state(sampling_state);
574 let mut candidate_row =
575 |realtime_usec| page_window.borrow().candidate_to_keep(realtime_usec);
576 control.set_candidate_row_callback(Some(&mut candidate_row));
577 let mut adjust_realtime =
578 |realtime_usec| realtime_adjuster.borrow_mut().adjust(realtime_usec);
579 control.set_realtime_adjust_callback(Some(&mut adjust_realtime));
580 let mut matched_row = |realtime_usec, rows_matched| {
581 delta_scan_can_stop(
582 request,
583 page_window,
584 realtime_usec,
585 rows_matched,
586 realtime_delta_usec,
587 )
588 };
589 control.set_matched_row_callback(Some(&mut matched_row));
590 let result = reader.explore_with_strategy_cursor_rows_controlled(
591 query,
592 self.config.explorer_strategy,
593 &mut control,
594 )?;
595 Ok((result, control.stop_reason()))
596 }
597
598 fn info_response(
599 &self,
600 echo: Value,
601 paths: &[PathBuf],
602 options: &NetdataFunctionRunOptions<'_>,
603 ) -> Value {
604 json!({
605 "_request": echo,
606 "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
607 "v": 3,
608 "accepted_params": self.accepted_params_from_fields(&[]),
609 "required_params": self.required_source_params(paths, options),
610 "show_ids": true,
611 "has_history": true,
612 "pagination": {
613 "enabled": true,
614 "key": "anchor",
615 "column": "timestamp",
616 "units": "timestamp_usec",
617 },
618 "status": 200,
619 "type": "table",
620 "help": "Netdata-compatible journal log function backed by the systemd journal SDK"
621 })
622 }
623
624 fn query_response(
625 &self,
626 request: NetdataRequest,
627 annotation_paths: &[PathBuf],
628 combined: CombinedResult,
629 ) -> Value {
630 let not_modified = request.if_modified_since_usec != 0
633 && !combined.partial
634 && combined.stats.rows_matched == 0;
635 if combined.cancelled {
636 return netdata_function_error(499, "Request cancelled.");
637 }
638 if not_modified {
639 return netdata_function_error(304, "No new data since the previous call.");
640 }
641 let artifacts = self.query_response_artifacts(&request, annotation_paths, &combined);
642 let mut response = base_query_response(&request, &combined, &artifacts);
643 let Some(object) = response.as_object_mut() else {
644 return netdata_function_error(500, "Internal Netdata function response error.");
645 };
646 self.add_query_response_metadata(object, &request, &combined, artifacts);
647 response
648 }
649
650 fn query_response_artifacts(
651 &self,
652 request: &NetdataRequest,
653 annotation_paths: &[PathBuf],
654 combined: &CombinedResult,
655 ) -> QueryResponseArtifacts {
656 let reportable_facet_fields = combined.reportable_facet_fields_bytes(&request.facets);
657 let reportable_facet_field_names = string_fields(&reportable_facet_fields);
658 let columns = self.build_columns(
659 &request,
660 &reportable_facet_fields,
661 &combined.rows,
662 &combined.facets,
663 &combined.column_fields,
664 );
665 let boot_ids = response_boot_ids(
666 &columns.order,
667 &combined.rows,
668 &combined.facets,
669 combined.histogram.as_ref(),
670 );
671 let context = DisplayContext {
672 boot_first_realtime: collect_boot_first_realtime(
673 annotation_paths,
674 self.config.reader_options,
675 &boot_ids,
676 ),
677 ..DisplayContext::default()
678 };
679 let data =
680 self.build_data_rows(&context, &columns.order, &combined.rows, request.direction);
681 let facets = self.build_facets(&context, &reportable_facet_fields, &combined.facets);
682 let histogram = combined.histogram.as_ref().map(|histogram| {
683 self.build_histogram(&context, histogram, combined.facets.get(&histogram.field))
684 });
685 let message = query_message(combined.timed_out, &combined.stats);
686 let items = response_items(request, combined, data.len() as u64);
687 QueryResponseArtifacts {
688 reportable_facet_field_names,
689 columns,
690 data,
691 facets,
692 histogram,
693 message,
694 items,
695 }
696 }
697
698 fn add_query_response_metadata(
699 &self,
700 object: &mut Map<String, Value>,
701 request: &NetdataRequest,
702 combined: &CombinedResult,
703 artifacts: QueryResponseArtifacts,
704 ) {
705 if !request.data_only {
706 self.add_full_query_response_metadata(object, request, combined, &artifacts);
707 } else if request.histogram.is_some() {
708 object.insert(
709 "available_histograms".to_string(),
710 self.available_histograms(request, combined),
711 );
712 }
713 add_last_modified_if_needed(object, request, combined);
714 add_sampling_if_needed(object, combined);
715 add_analysis_outputs_if_needed(object, request, artifacts);
716 }
717
718 fn add_full_query_response_metadata(
719 &self,
720 object: &mut Map<String, Value>,
721 request: &NetdataRequest,
722 combined: &CombinedResult,
723 artifacts: &QueryResponseArtifacts,
724 ) {
725 object.insert("message".to_string(), artifacts.message.clone());
726 object.insert("update_every".to_string(), Value::from(1));
727 object.insert("help".to_string(), Value::Null);
728 object.insert(
729 "accepted_params".to_string(),
730 self.accepted_params_from_fields(&artifacts.reportable_facet_field_names),
731 );
732 object.insert("default_sort_column".to_string(), Value::from("timestamp"));
733 object.insert("default_charts".to_string(), Value::Array(Vec::new()));
734 object.insert(
735 "available_histograms".to_string(),
736 self.available_histograms(request, combined),
737 );
738 }
739
740 fn build_columns(
741 &self,
742 request: &NetdataRequest,
743 reportable_facet_fields: &[Vec<u8>],
744 rows: &[LocatedRow],
745 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
746 column_fields: &BTreeSet<String>,
747 ) -> Columns {
748 let mut order = vec!["timestamp".to_string(), "rowOptions".to_string()];
749 push_unique_many(&mut order, &self.config.default_view_keys);
750 push_unique_many(&mut order, &string_fields(reportable_facet_fields));
751 if let Some(histogram) = &request.histogram {
752 push_unique(&mut order, histogram);
753 }
754 for field in column_fields {
755 push_unique(&mut order, field);
756 }
757
758 for (field, values) in facets {
759 if !facet_group_is_reportable(values) {
760 continue;
761 }
762 push_unique(&mut order, &String::from_utf8_lossy(field));
763 }
764 for row in rows {
765 let fields = row_fields(row);
766 for field in fields.keys() {
767 push_unique(&mut order, field);
768 }
769 }
770
771 let mut map = Map::new();
772 for (index, key) in order.iter().enumerate() {
773 map.insert(key.clone(), column_metadata(key, index));
774 }
775 Columns { order, map }
776 }
777
778 fn build_data_rows(
779 &self,
780 context: &DisplayContext,
781 column_order: &[String],
782 rows: &[LocatedRow],
783 direction: Direction,
784 ) -> Vec<Value> {
785 let row_iter: Box<dyn Iterator<Item = &LocatedRow> + '_> = match direction {
786 Direction::Forward => Box::new(rows.iter().rev()),
787 Direction::Backward => Box::new(rows.iter()),
788 };
789 row_iter
790 .map(|located| {
791 let fields = row_fields(located);
792 let mut row = Vec::with_capacity(column_order.len());
793 for column in column_order {
794 let value = match column.as_str() {
795 "timestamp" => Value::from(located.row.realtime_usec),
796 "rowOptions" => self.profile.row_options(&fields),
797 field => first_value(&fields, field)
798 .map(|value| {
799 self.profile.field_display_value(
800 context,
801 DisplayScope::Data,
802 field,
803 value,
804 )
805 })
806 .unwrap_or(Value::Null),
807 };
808 row.push(value);
809 }
810 Value::Array(row)
811 })
812 .collect()
813 }
814
815 fn build_facets(
816 &self,
817 context: &DisplayContext,
818 requested: &[Vec<u8>],
819 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
820 ) -> Value {
821 let mut out = Vec::new();
822 for (order, field) in requested.iter().enumerate() {
823 let values = facets.get(field);
824 let field_name = String::from_utf8_lossy(field).into_owned();
825 let mut options: Vec<_> = values
826 .into_iter()
827 .flat_map(|values| values.iter())
828 .filter(|(value, count)| {
829 (!value.is_empty() && value.as_slice() != b"-")
830 || (**count == 0 && value.is_empty())
831 })
832 .map(|(value, count)| {
833 if *count == 0 && value.is_empty() {
834 return json!({
835 "id": NETDATA_EMPTY_STRING_FACET_HASH_ID,
836 "name": NETDATA_UNAVAILABLE_FIELD_LABEL,
837 "count": count,
838 });
839 }
840 json!({
841 "id": String::from_utf8_lossy(value).into_owned(),
842 "name": self.profile.facet_option_name(context, &field_name, value),
843 "count": count,
844 })
845 })
846 .collect();
847 sort_facet_options(&field_name, &mut options);
848 for (idx, option) in options.iter_mut().enumerate() {
849 if let Some(object) = option.as_object_mut() {
850 object.insert("order".to_string(), Value::from((idx + 1) as u64));
851 }
852 }
853 out.push(json!({
854 "id": field_name,
855 "name": String::from_utf8_lossy(field).into_owned(),
856 "order": order + 1,
857 "options": options,
858 }));
859 }
860 Value::Array(out)
861 }
862
863 fn build_histogram(
864 &self,
865 context: &DisplayContext,
866 histogram: &ExplorerHistogram,
867 known_values: Option<&BTreeMap<Vec<u8>, u64>>,
868 ) -> Value {
869 let field = String::from_utf8_lossy(&histogram.field).into_owned();
870 let mut dimension_ids = BTreeSet::new();
871 let mut buckets = Vec::with_capacity(histogram.buckets.len());
872 for bucket in &histogram.buckets {
873 let mut values = BTreeMap::new();
874 for (value, count) in &bucket.values {
875 add_netdata_facet_count(&mut values, value, *count);
876 }
877 for value in values.keys() {
878 dimension_ids.insert(value.clone());
879 }
880 buckets.push((bucket.start_realtime_usec, values));
881 }
882 let actual_dimension_ids = dimension_ids.clone();
883 if let Some(known_values) = known_values {
884 for value in known_values.keys() {
885 if value.is_empty() || value.as_slice() == b"-" {
886 continue;
887 }
888 dimension_ids.insert(value.clone());
889 }
890 }
891 let dimension_ids: Vec<Vec<u8>> = dimension_ids.into_iter().collect();
892 let labels: Vec<Value> = std::iter::once(Value::String("time".to_string()))
893 .chain(dimension_ids.iter().map(|value| {
894 match self.profile.field_display_value(
895 context,
896 DisplayScope::Histogram,
897 &field,
898 value,
899 ) {
900 Value::String(value) => Value::String(value),
901 other => Value::String(other.to_string()),
902 }
903 }))
904 .collect();
905 let data: Vec<Value> = buckets
906 .iter()
907 .map(|(start_realtime_usec, values)| {
908 let mut point = Vec::with_capacity(dimension_ids.len() + 1);
909 point.push(Value::from(start_realtime_usec / 1000));
910 for value in &dimension_ids {
911 let count = values
912 .get(value)
913 .copied()
914 .map(Value::from)
915 .unwrap_or_else(|| {
916 if actual_dimension_ids.contains(value) {
917 Value::from(0)
918 } else {
919 Value::Null
920 }
921 });
922 point.push(Value::Array(vec![count, Value::from(0), Value::from(0)]));
923 }
924 Value::Array(point)
925 })
926 .collect();
927
928 json!({
929 "id": field,
930 "name": field,
931 "chart": {
932 "result": {
933 "labels": labels,
934 "point": { "value": 0, "arp": 1, "pa": 2 },
935 "data": data,
936 },
937 "view": {
938 "title": format!("Events Distribution by {}", String::from_utf8_lossy(&histogram.field)),
939 "update_every": histogram_update_every_seconds(histogram),
940 "units": "events",
941 "chart_type": "stackedBar",
942 }
943 }
944 })
945 }
946
947 fn accepted_params_from_fields(&self, fields: &[String]) -> Value {
948 NETDATA_ACCEPTED_PARAMS
949 .iter()
950 .copied()
951 .chain(fields.iter().map(String::as_str))
952 .map(|field| Value::String(field.to_string()))
953 .collect()
954 }
955
956 fn required_source_params(
957 &self,
958 paths: &[PathBuf],
959 options: &NetdataFunctionRunOptions<'_>,
960 ) -> Value {
961 let mut all = JournalSourceSummary::default();
962 let mut local = JournalSourceSummary::default();
963 let mut local_namespaces = JournalSourceSummary::default();
964 let mut local_system = JournalSourceSummary::default();
965 let mut local_user = JournalSourceSummary::default();
966 let mut remote = JournalSourceSummary::default();
967 let mut other = JournalSourceSummary::default();
968 let mut exact = BTreeMap::<String, JournalSourceSummary>::new();
969
970 for path in paths {
971 let metadata = file_metadata(options, path);
972 let source_type = metadata
973 .as_ref()
974 .and_then(|metadata| metadata.source_type)
975 .unwrap_or_else(|| journal_file_source_type(path));
976 all.add_path(path, self.config.reader_options, metadata.as_ref());
977 if source_type & SOURCE_TYPE_LOCAL_ALL != 0 {
978 local.add_path(path, self.config.reader_options, metadata.as_ref());
979 }
980 if source_type & SOURCE_TYPE_LOCAL_NAMESPACE != 0 {
981 local_namespaces.add_path(path, self.config.reader_options, metadata.as_ref());
982 }
983 if source_type & SOURCE_TYPE_LOCAL_SYSTEM != 0 {
984 local_system.add_path(path, self.config.reader_options, metadata.as_ref());
985 }
986 if source_type & SOURCE_TYPE_LOCAL_USER != 0 {
987 local_user.add_path(path, self.config.reader_options, metadata.as_ref());
988 }
989 if source_type & SOURCE_TYPE_REMOTE_ALL != 0 {
990 remote.add_path(path, self.config.reader_options, metadata.as_ref());
991 }
992 if source_type & SOURCE_TYPE_LOCAL_OTHER != 0 {
993 other.add_path(path, self.config.reader_options, metadata.as_ref());
994 }
995 let source_name = metadata
996 .as_ref()
997 .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
998 .or_else(|| journal_file_exact_source_name(path));
999 if let Some(source_name) = source_name {
1000 exact.entry(source_name).or_default().add_path(
1001 path,
1002 self.config.reader_options,
1003 metadata.as_ref(),
1004 );
1005 }
1006 }
1007
1008 let mut source_options = Vec::new();
1009 push_source_option(&mut source_options, "all", &all);
1010 push_source_option(&mut source_options, "all-local-logs", &local);
1011 push_source_option(
1012 &mut source_options,
1013 "all-local-namespaces",
1014 &local_namespaces,
1015 );
1016 push_source_option(&mut source_options, "all-local-system-logs", &local_system);
1017 push_source_option(&mut source_options, "all-local-user-logs", &local_user);
1018 push_source_option(&mut source_options, "all-remote-systems", &remote);
1019 push_source_option(&mut source_options, "all-uncategorized", &other);
1020 for (name, summary) in exact {
1021 push_source_option(&mut source_options, &name, &summary);
1022 }
1023
1024 json!([{
1025 "id": "__logs_sources",
1026 "name": "Journal Sources",
1027 "help": "Select the logs source to query",
1028 "type": "multiselect",
1029 "options": source_options,
1030 }])
1031 }
1032
1033 fn available_histograms(&self, request: &NetdataRequest, combined: &CombinedResult) -> Value {
1034 let mut fields = combined.reportable_facet_fields(&request.facets);
1035 if request.data_only {
1036 if let Some(histogram) = &request.histogram {
1037 push_unique(&mut fields, histogram);
1038 }
1039 }
1040 let mut sorted = fields.clone();
1041 sorted.sort_by(|left, right| netdata_reorder_key(left).cmp(&netdata_reorder_key(right)));
1042 let order_by_field: BTreeMap<String, usize> = sorted
1043 .into_iter()
1044 .enumerate()
1045 .map(|(index, field)| (field, index + 1))
1046 .collect();
1047
1048 fields
1049 .into_iter()
1050 .map(|field| {
1051 let order = order_by_field.get(&field).copied().unwrap_or(0);
1052 json!({
1053 "id": field,
1054 "name": field,
1055 "order": order,
1056 })
1057 })
1058 .collect()
1059 }
1060}
1061
1062#[derive(Debug, Clone)]
1063struct NetdataRequest {
1064 info: bool,
1065 echo: Value,
1066 after_realtime_usec: Option<u64>,
1067 before_realtime_usec: Option<u64>,
1068 if_modified_since_usec: u64,
1069 anchor: ExplorerAnchor,
1070 direction: Direction,
1071 limit: usize,
1072 data_only: bool,
1073 delta: bool,
1074 tail: bool,
1075 sampling: u64,
1076 source_type: u64,
1077 exact_sources: Vec<String>,
1078 filters: Vec<ExplorerFilter>,
1079 facets: Vec<Vec<u8>>,
1080 histogram: Option<String>,
1081 fts_terms: Vec<ExplorerFtsPattern>,
1082 fts_patterns: Vec<Vec<u8>>,
1083 fts_negative_patterns: Vec<Vec<u8>>,
1084}
1085
1086impl NetdataRequest {
1087 fn parse(value: &Value, config: &NetdataFunctionConfig) -> Result<Self> {
1088 let object = value.as_object().ok_or_else(|| {
1089 SdkError::InvalidPath("Netdata function request must be a JSON object".to_string())
1090 })?;
1091 let now_seconds = unix_now_seconds();
1092 let info = get_bool(object, "info").unwrap_or(false);
1093 let after = get_i64(object, "after");
1094 let before = get_i64(object, "before");
1095 let (after_realtime_usec, before_realtime_usec) =
1096 normalize_time_window(now_seconds, after, before);
1097 let direction = request_direction(object);
1098 let if_modified_since_usec = get_u64(object, "if_modified_since").unwrap_or_default();
1099 let data_only = get_bool(object, "data_only").unwrap_or(false);
1100 let delta = request_delta(data_only, object);
1101 let tail = request_tail(data_only, if_modified_since_usec, object);
1102 let sampling = get_u64(object, "sampling").unwrap_or(DEFAULT_ITEMS_SAMPLING);
1103 let (anchor, direction) = request_anchor_and_direction(
1104 object,
1105 tail,
1106 direction,
1107 after_realtime_usec,
1108 before_realtime_usec,
1109 );
1110 let requested_limit = request_limit(object);
1111 let limit = requested_limit.max(2);
1112 let requested_facets = parse_string_array(object.get("facets"));
1113 let facets = request_facets(&requested_facets, config);
1114 let requested_histogram = request_histogram(object);
1115 let histogram = request_histogram_or_default(&requested_histogram, config);
1116 let requested_query = request_query(object);
1117 let (fts_terms, fts_patterns, fts_negative_patterns) = requested_query
1118 .as_deref()
1119 .map(parse_fts_query_patterns)
1120 .unwrap_or_default();
1121 let source_selection = parse_source_selection(object.get("selections"));
1122 let filters = parse_filters(object.get("selections"));
1123
1124 let echo_input = RequestEchoInput {
1125 info,
1126 after_realtime_usec,
1127 before_realtime_usec,
1128 if_modified_since_usec,
1129 anchor,
1130 direction,
1131 limit: requested_limit,
1132 data_only,
1133 delta,
1134 tail,
1135 sampling,
1136 source_type: source_selection.source_type,
1137 requested_facets: requested_facets.as_deref(),
1138 selections: object.get("selections"),
1139 histogram: requested_histogram.as_deref(),
1140 query: requested_query.as_deref(),
1141 };
1142 let echo = normalized_request_echo(&echo_input);
1143
1144 Ok(Self {
1145 info,
1146 echo,
1147 after_realtime_usec,
1148 before_realtime_usec,
1149 if_modified_since_usec,
1150 anchor,
1151 direction,
1152 limit,
1153 data_only,
1154 delta,
1155 tail,
1156 sampling,
1157 source_type: source_selection.source_type,
1158 exact_sources: source_selection.exact_sources,
1159 filters,
1160 facets,
1161 histogram,
1162 fts_terms,
1163 fts_patterns,
1164 fts_negative_patterns,
1165 })
1166 }
1167
1168 fn matches_source(&self, path: &Path, metadata: Option<&NetdataJournalFileMetadata>) -> bool {
1169 if self.source_type == SOURCE_TYPE_ALL && self.exact_sources.is_empty() {
1170 return true;
1171 }
1172 if self.source_type & SOURCE_TYPE_ALL != 0 {
1173 return true;
1174 }
1175 let file_source_type = metadata
1176 .and_then(|metadata| metadata.source_type)
1177 .unwrap_or_else(|| journal_file_source_type(path));
1178 if file_source_type & self.source_type != 0 {
1179 return true;
1180 }
1181 if self.exact_sources.is_empty() {
1182 return false;
1183 }
1184 let source_name = metadata
1185 .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1186 .or_else(|| journal_file_exact_source_name(path));
1187 self.exact_sources
1188 .iter()
1189 .any(|source| source_name.as_deref() == Some(source.as_str()))
1190 }
1191
1192 fn to_explorer_query(
1193 &self,
1194 matched_files: u64,
1195 file_header: Option<FileHeader>,
1196 realtime_slack_usec: u64,
1197 ) -> ExplorerQuery {
1198 let analysis_enabled = !self.data_only || self.delta;
1199 let tail_anchor = self.tail && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1200 let sampling = (analysis_enabled
1201 && self.sampling != 0
1202 && matched_files != 0
1203 && self.after_realtime_usec.is_some()
1204 && self.before_realtime_usec.is_some())
1205 .then(|| {
1206 let header = file_header.unwrap_or(FileHeader {
1207 signature: [0; 8],
1208 compatible_flags: 0,
1209 incompatible_flags: 0,
1210 state: 0,
1211 header_size: 0,
1212 n_entries: 0,
1213 head_entry_realtime: 0,
1214 tail_entry_realtime: 0,
1215 head_entry_seqnum: 0,
1216 tail_entry_seqnum: 0,
1217 tail_entry_boot_id: [0; 16],
1218 seqnum_id: [0; 16],
1219 });
1220 let messages_in_file = header
1221 .tail_entry_seqnum
1222 .checked_sub(header.head_entry_seqnum)
1223 .and_then(|span| span.checked_add(1))
1224 .filter(|_| header.head_entry_seqnum != 0 && header.tail_entry_seqnum != 0)
1225 .unwrap_or(header.n_entries);
1226 ExplorerSampling {
1227 budget: self.sampling,
1228 matched_files,
1229 file_head_realtime_usec: header.head_entry_realtime,
1230 file_tail_realtime_usec: header.tail_entry_realtime,
1231 file_head_seqnum: header.head_entry_seqnum,
1232 file_tail_seqnum: header.tail_entry_seqnum,
1233 file_entries: messages_in_file,
1234 }
1235 });
1236 ExplorerQuery {
1237 after_realtime_usec: self.after_realtime_usec,
1238 before_realtime_usec: self.before_realtime_usec,
1239 anchor: self.anchor,
1240 direction: self.direction,
1241 limit: self.limit,
1242 filters: self.filters.clone(),
1243 facets: analysis_enabled
1244 .then(|| self.facets.clone())
1245 .unwrap_or_default(),
1246 histogram: analysis_enabled
1247 .then(|| {
1248 self.histogram
1249 .as_ref()
1250 .map(|field| field.as_bytes().to_vec())
1251 })
1252 .flatten(),
1253 histogram_target_buckets: DEFAULT_HISTOGRAM_BUCKETS,
1254 fts_terms: self.fts_terms.clone(),
1255 fts_patterns: self.fts_patterns.clone(),
1256 fts_negative_patterns: self.fts_negative_patterns.clone(),
1257 field_mode: ExplorerFieldMode::FirstValue,
1258 exclude_facet_field_filters: self.distinct_filter_fields() > 1,
1259 use_source_realtime: true,
1260 realtime_slack_usec: normalize_journal_vs_realtime_delta_usec(realtime_slack_usec),
1261 stop_when_rows_full: self.data_only && !tail_anchor,
1262 stop_when_rows_full_check_every: DATA_ONLY_CHECK_EVERY_ROWS,
1263 sampling,
1264 debug_collect_column_fields_by_row_traversal: false,
1265 }
1266 }
1267
1268 fn file_query(
1269 &self,
1270 matched_files: usize,
1271 file_header: FileHeader,
1272 order: &JournalFileOrderInfo,
1273 ) -> ExplorerQuery {
1274 let mut query = self.to_explorer_query(
1275 matched_files as u64,
1276 Some(file_header),
1277 order.journal_vs_realtime_delta_usec,
1278 );
1279 if self.data_only && self.delta {
1280 query.stop_when_rows_full = false;
1281 }
1282 query
1283 }
1284
1285 fn unfiltered_vocabulary(&self) -> Self {
1286 let mut request = self.clone();
1287 request.filters.clear();
1288 request.histogram = None;
1289 request.limit = 0;
1290 request.fts_terms.clear();
1291 request.fts_patterns.clear();
1292 request.fts_negative_patterns.clear();
1293 request
1294 }
1295
1296 fn distinct_filter_fields(&self) -> usize {
1297 self.filters
1298 .iter()
1299 .map(|filter| filter.field.as_slice())
1300 .collect::<HashSet<_>>()
1301 .len()
1302 }
1303}
1304
1305#[derive(Debug, Clone)]
1306struct LocatedRow {
1307 file_path: PathBuf,
1308 row: ExplorerRow,
1309}
1310
1311#[derive(Debug)]
1312struct NetdataRealtimeAdjuster {
1313 direction: Direction,
1314 last_realtime_from: u64,
1315 last_realtime_to: u64,
1316}
1317
1318impl NetdataRealtimeAdjuster {
1319 fn new(direction: Direction) -> Self {
1320 Self {
1321 direction,
1322 last_realtime_from: 0,
1323 last_realtime_to: 0,
1324 }
1325 }
1326
1327 fn adjust(&mut self, realtime_usec: u64) -> u64 {
1328 match self.direction {
1329 Direction::Backward => {
1330 if realtime_usec >= self.last_realtime_from
1331 && realtime_usec <= self.last_realtime_to
1332 {
1333 self.last_realtime_from = self.last_realtime_from.saturating_sub(1);
1334 self.last_realtime_from
1335 } else {
1336 self.last_realtime_from = realtime_usec;
1337 self.last_realtime_to = realtime_usec;
1338 realtime_usec
1339 }
1340 }
1341 Direction::Forward => {
1342 if realtime_usec >= self.last_realtime_from
1343 && realtime_usec <= self.last_realtime_to
1344 {
1345 self.last_realtime_to = self.last_realtime_to.saturating_add(1);
1346 self.last_realtime_to
1347 } else {
1348 self.last_realtime_from = realtime_usec;
1349 self.last_realtime_to = realtime_usec;
1350 realtime_usec
1351 }
1352 }
1353 }
1354 }
1355}
1356
1357#[derive(Debug, Default)]
1358struct JournalSourceSummary {
1359 files: u64,
1360 total_size: u64,
1361 first_realtime_usec: Option<u64>,
1362 last_realtime_usec: Option<u64>,
1363}
1364
1365impl JournalSourceSummary {
1366 #[cfg(test)]
1367 fn from_paths(
1368 paths: &[PathBuf],
1369 reader_options: ReaderOptions,
1370 options: &NetdataFunctionRunOptions<'_>,
1371 ) -> Self {
1372 let mut summary = Self::default();
1373 for path in paths {
1374 let metadata = file_metadata(options, path);
1375 summary.add_path(path, reader_options, metadata.as_ref());
1376 }
1377 summary
1378 }
1379
1380 fn add_path(
1381 &mut self,
1382 path: &Path,
1383 reader_options: ReaderOptions,
1384 metadata: Option<&NetdataJournalFileMetadata>,
1385 ) {
1386 if let Ok(metadata) = std::fs::metadata(path) {
1387 self.files = self.files.saturating_add(1);
1388 self.total_size = self.total_size.saturating_add(metadata.len());
1389 }
1390 if let Some(metadata) = metadata {
1391 let metadata_first = metadata.msg_first_realtime_usec;
1392 let metadata_last = metadata.msg_last_realtime_usec;
1393 if let Some(first) = metadata_first {
1394 self.first_realtime_usec = Some(
1395 self.first_realtime_usec
1396 .map_or(first, |current| current.min(first)),
1397 );
1398 }
1399 if let Some(last) = metadata_last {
1400 self.last_realtime_usec = Some(
1401 self.last_realtime_usec
1402 .map_or(last, |current| current.max(last)),
1403 );
1404 }
1405 if metadata_first.is_some() && metadata_last.is_some() {
1406 return;
1407 }
1408 }
1409 let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
1410 return;
1411 };
1412 let header = reader.header();
1413 if header.head_entry_realtime != 0 {
1414 self.first_realtime_usec = Some(
1415 self.first_realtime_usec
1416 .map_or(header.head_entry_realtime, |current| {
1417 current.min(header.head_entry_realtime)
1418 }),
1419 );
1420 }
1421 if header.tail_entry_realtime != 0 {
1422 self.last_realtime_usec = Some(
1423 self.last_realtime_usec
1424 .map_or(header.tail_entry_realtime, |current| {
1425 current.max(header.tail_entry_realtime)
1426 }),
1427 );
1428 }
1429 }
1430
1431 fn info(&self) -> String {
1432 let coverage = match (self.first_realtime_usec, self.last_realtime_usec) {
1433 (Some(first), Some(last)) if last >= first => {
1434 human_duration_seconds((last - first) / 1_000_000)
1435 }
1436 _ => "0s".to_string(),
1437 };
1438 let last_entry = self
1439 .last_realtime_usec
1440 .and_then(|usec| DateTime::<Utc>::from_timestamp((usec / 1_000_000) as i64, 0))
1441 .map(|datetime| datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1442 .unwrap_or_else(|| "unknown".to_string());
1443 format!(
1444 "{} files, total size {}, covering {}, last entry at {}",
1445 self.files,
1446 human_binary_size(self.total_size),
1447 coverage,
1448 last_entry
1449 )
1450 }
1451}
1452
1453fn push_source_option(target: &mut Vec<Value>, id: &str, summary: &JournalSourceSummary) {
1454 if summary.files == 0 {
1455 return;
1456 }
1457 target.push(json!({
1458 "id": id,
1459 "name": id,
1460 "info": summary.info(),
1461 "pill": human_binary_size(summary.total_size),
1462 }));
1463}
1464
1465fn expand_located_row_payloads(
1466 located: &mut LocatedRow,
1467 reader_options: ReaderOptions,
1468) -> Result<()> {
1469 let mut reader = FileReader::open_with_options(&located.file_path, reader_options)?;
1470 reader.seek_cursor(&located.row.cursor)?;
1471 if !reader.test_cursor(&located.row.cursor)? {
1472 return Err(SdkError::InvalidCursor(format!(
1473 "selected row cursor is no longer available: {}",
1474 located.row.cursor
1475 )));
1476 }
1477 reader.collect_entry_payloads(&mut located.row.payloads)
1478}
1479
1480#[derive(Debug, Default)]
1481struct CombinedResult {
1482 rows: Vec<LocatedRow>,
1483 facets: BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1484 histogram: Option<ExplorerHistogram>,
1485 column_fields: BTreeSet<String>,
1486 stats: ExplorerStats,
1487 page_counters: Option<NetdataPageCounters>,
1488 matched_files: u64,
1489 matched_paths: Vec<PathBuf>,
1490 skipped_files: u64,
1491 file_errors: Vec<String>,
1492 partial: bool,
1493 timed_out: bool,
1494 cancelled: bool,
1495 sampling_enabled: bool,
1496}
1497
1498#[derive(Clone, Copy, Debug, Default)]
1499struct NetdataPageCounters {
1500 matched: u64,
1501 before: u64,
1502 after: u64,
1503}
1504
1505#[derive(Clone, Copy)]
1506struct ProgressContext {
1507 current_file: usize,
1508 total_files: usize,
1509 started: Instant,
1510}
1511
1512#[derive(Debug)]
1513enum NetdataPageHeap {
1514 Backward(BinaryHeap<Reverse<u64>>),
1515 Forward(BinaryHeap<u64>),
1516}
1517
1518#[derive(Debug)]
1519struct NetdataPageWindow {
1520 direction: Direction,
1521 anchor_start_usec: Option<u64>,
1522 anchor_stop_usec: Option<u64>,
1523 limit: usize,
1524 heap: NetdataPageHeap,
1525 oldest_retained_usec: Option<u64>,
1526 newest_retained_usec: Option<u64>,
1527 matched: u64,
1528 skips_before: u64,
1529 skips_after: u64,
1530 shifts: u64,
1531}
1532
1533impl NetdataPageWindow {
1534 fn for_request(request: &NetdataRequest) -> Self {
1535 let anchor_start_usec = match request.anchor {
1536 ExplorerAnchor::Realtime(anchor) => Some(anchor),
1537 _ => None,
1538 };
1539 let heap = match request.direction {
1540 Direction::Backward => NetdataPageHeap::Backward(BinaryHeap::new()),
1541 Direction::Forward => NetdataPageHeap::Forward(BinaryHeap::new()),
1542 };
1543 Self {
1544 direction: request.direction,
1545 anchor_start_usec,
1546 anchor_stop_usec: None,
1547 limit: request.limit,
1548 heap,
1549 oldest_retained_usec: None,
1550 newest_retained_usec: None,
1551 matched: 0,
1552 skips_before: 0,
1553 skips_after: 0,
1554 shifts: 0,
1555 }
1556 }
1557
1558 fn candidate_to_keep(&self, realtime_usec: u64) -> bool {
1559 if self.limit == 0 || !self.entry_within_anchor_readonly(realtime_usec) {
1560 return false;
1561 }
1562 if self.retained_len() < self.limit {
1563 return true;
1564 }
1565 self.oldest_retained_usec
1566 .zip(self.newest_retained_usec)
1567 .is_some_and(|(oldest, newest)| realtime_usec >= oldest && realtime_usec <= newest)
1568 }
1569
1570 fn observe(&mut self, realtime_usec: u64) {
1571 if !self.entry_within_anchor(realtime_usec) || self.limit == 0 {
1572 return;
1573 }
1574 self.matched = self.matched.saturating_add(1);
1575 match (&mut self.heap, self.direction) {
1576 (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1577 if heap.len() < self.limit {
1578 heap.push(Reverse(realtime_usec));
1579 self.add_retained_bound(realtime_usec);
1580 return;
1581 }
1582 let Some(Reverse(oldest)) = heap.peek().copied() else {
1583 heap.push(Reverse(realtime_usec));
1584 self.refresh_retained_bounds();
1585 return;
1586 };
1587 if realtime_usec < oldest {
1588 self.skips_after = self.skips_after.saturating_add(1);
1589 return;
1590 }
1591 heap.pop();
1592 heap.push(Reverse(realtime_usec));
1593 self.refresh_retained_bounds();
1594 self.shifts = self.shifts.saturating_add(1);
1595 }
1596 (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1597 if heap.len() < self.limit {
1598 heap.push(realtime_usec);
1599 self.add_retained_bound(realtime_usec);
1600 return;
1601 }
1602 let Some(newest) = heap.peek().copied() else {
1603 heap.push(realtime_usec);
1604 self.refresh_retained_bounds();
1605 return;
1606 };
1607 if realtime_usec > newest {
1608 self.skips_before = self.skips_before.saturating_add(1);
1609 return;
1610 }
1611 heap.pop();
1612 heap.push(realtime_usec);
1613 self.refresh_retained_bounds();
1614 self.shifts = self.shifts.saturating_add(1);
1615 }
1616 _ => {}
1617 }
1618 }
1619
1620 fn retained_len(&self) -> usize {
1621 match &self.heap {
1622 NetdataPageHeap::Backward(heap) => heap.len(),
1623 NetdataPageHeap::Forward(heap) => heap.len(),
1624 }
1625 }
1626
1627 fn add_retained_bound(&mut self, realtime_usec: u64) {
1628 self.oldest_retained_usec = Some(
1629 self.oldest_retained_usec
1630 .map_or(realtime_usec, |current| current.min(realtime_usec)),
1631 );
1632 self.newest_retained_usec = Some(
1633 self.newest_retained_usec
1634 .map_or(realtime_usec, |current| current.max(realtime_usec)),
1635 );
1636 }
1637
1638 fn refresh_retained_bounds(&mut self) {
1639 let (oldest, newest) = match &self.heap {
1640 NetdataPageHeap::Backward(heap) => heap
1641 .iter()
1642 .map(|Reverse(usec)| *usec)
1643 .fold((None, None), retained_bounds_fold),
1644 NetdataPageHeap::Forward(heap) => heap
1645 .iter()
1646 .copied()
1647 .fold((None, None), retained_bounds_fold),
1648 };
1649 self.oldest_retained_usec = oldest;
1650 self.newest_retained_usec = newest;
1651 }
1652
1653 fn entry_within_anchor(&mut self, realtime_usec: u64) -> bool {
1654 match self.direction {
1655 Direction::Backward => {
1656 if self
1657 .anchor_start_usec
1658 .is_some_and(|anchor| realtime_usec >= anchor)
1659 {
1660 self.skips_before = self.skips_before.saturating_add(1);
1661 return false;
1662 }
1663 if self
1664 .anchor_stop_usec
1665 .is_some_and(|anchor| realtime_usec <= anchor)
1666 {
1667 self.skips_after = self.skips_after.saturating_add(1);
1668 return false;
1669 }
1670 }
1671 Direction::Forward => {
1672 if self
1673 .anchor_start_usec
1674 .is_some_and(|anchor| realtime_usec <= anchor)
1675 {
1676 self.skips_after = self.skips_after.saturating_add(1);
1677 return false;
1678 }
1679 if self
1680 .anchor_stop_usec
1681 .is_some_and(|anchor| realtime_usec >= anchor)
1682 {
1683 self.skips_before = self.skips_before.saturating_add(1);
1684 return false;
1685 }
1686 }
1687 }
1688 true
1689 }
1690
1691 fn entry_within_anchor_readonly(&self, realtime_usec: u64) -> bool {
1692 match self.direction {
1693 Direction::Backward => {
1694 if self
1695 .anchor_start_usec
1696 .is_some_and(|anchor| realtime_usec >= anchor)
1697 {
1698 return false;
1699 }
1700 if self
1701 .anchor_stop_usec
1702 .is_some_and(|anchor| realtime_usec <= anchor)
1703 {
1704 return false;
1705 }
1706 }
1707 Direction::Forward => {
1708 if self
1709 .anchor_start_usec
1710 .is_some_and(|anchor| realtime_usec <= anchor)
1711 {
1712 return false;
1713 }
1714 if self
1715 .anchor_stop_usec
1716 .is_some_and(|anchor| realtime_usec >= anchor)
1717 {
1718 return false;
1719 }
1720 }
1721 }
1722 true
1723 }
1724
1725 fn counters(&self) -> NetdataPageCounters {
1726 NetdataPageCounters {
1727 matched: self.matched,
1728 before: self.skips_before,
1729 after: self.skips_after.saturating_add(self.shifts),
1730 }
1731 }
1732
1733 fn can_stop_delta_file(&self, realtime_usec: u64, slack_usec: u64) -> bool {
1734 if self.limit == 0 {
1735 return false;
1736 }
1737 match (&self.heap, self.direction) {
1738 (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1739 if heap.len() < self.limit {
1740 return false;
1741 }
1742 heap.peek().is_some_and(|Reverse(oldest)| {
1743 realtime_usec < oldest.saturating_sub(slack_usec)
1744 })
1745 }
1746 (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1747 if heap.len() < self.limit {
1748 return false;
1749 }
1750 heap.peek()
1751 .is_some_and(|newest| realtime_usec > newest.saturating_add(slack_usec))
1752 }
1753 _ => false,
1754 }
1755 }
1756}
1757
1758fn retained_bounds_fold(
1759 (oldest, newest): (Option<u64>, Option<u64>),
1760 realtime_usec: u64,
1761) -> (Option<u64>, Option<u64>) {
1762 (
1763 Some(oldest.map_or(realtime_usec, |current| current.min(realtime_usec))),
1764 Some(newest.map_or(realtime_usec, |current| current.max(realtime_usec))),
1765 )
1766}
1767
1768impl CombinedResult {
1769 fn merge(
1770 &mut self,
1771 path: &Path,
1772 result: ExplorerResult,
1773 direction: Direction,
1774 limit: usize,
1775 ) -> Result<()> {
1776 let ExplorerResult {
1777 rows,
1778 facets,
1779 histogram,
1780 stats,
1781 column_fields,
1782 ..
1783 } = result;
1784
1785 if let Some(histogram) = histogram {
1786 merge_histogram(&mut self.histogram, histogram)?;
1787 }
1788
1789 self.merge_stats(stats);
1790 for row in rows {
1791 self.rows.push(LocatedRow {
1792 file_path: path.to_path_buf(),
1793 row,
1794 });
1795 }
1796 for field in column_fields {
1797 if let Ok(field) = String::from_utf8(field) {
1798 self.column_fields.insert(field);
1799 }
1800 }
1801 for (field, values) in facets {
1802 let target = self.facets.entry(field).or_default();
1803 for (value, count) in values {
1804 add_netdata_facet_count(target, &value, count);
1805 }
1806 }
1807 self.sort_and_limit(direction, limit);
1808 Ok(())
1809 }
1810
1811 fn add_column_fields<I>(&mut self, fields: I)
1812 where
1813 I: IntoIterator<Item = String>,
1814 {
1815 self.column_fields.extend(fields);
1816 }
1817
1818 fn sort_and_limit(&mut self, direction: Direction, limit: usize) {
1819 match direction {
1820 Direction::Forward => self.rows.sort_by_key(|row| row.row.realtime_usec),
1821 Direction::Backward => self
1822 .rows
1823 .sort_by(|left, right| right.row.realtime_usec.cmp(&left.row.realtime_usec)),
1824 }
1825 make_row_timestamps_unique(&mut self.rows, direction);
1826 if self.rows.len() > limit {
1827 self.rows.truncate(limit);
1828 }
1829 self.stats.rows_returned = self.rows.len() as u64;
1830 }
1831
1832 fn expand_row_payloads(&mut self, reader_options: ReaderOptions) {
1833 if self.rows.is_empty() {
1834 self.stats.rows_returned = 0;
1835 return;
1836 }
1837
1838 let mut rows = Vec::with_capacity(self.rows.len());
1839 for mut located in self.rows.drain(..) {
1840 if !located.row.payloads.is_empty() {
1841 rows.push(located);
1842 continue;
1843 }
1844 match expand_located_row_payloads(&mut located, reader_options) {
1845 Ok(()) => {
1846 self.stats.returned_row_expansions =
1847 self.stats.returned_row_expansions.saturating_add(1);
1848 rows.push(located);
1849 }
1850 Err(err) => {
1851 self.partial = true;
1852 self.file_errors.push(format!(
1853 "{} cursor {}: {err}",
1854 located.file_path.display(),
1855 located.row.cursor
1856 ));
1857 }
1858 }
1859 }
1860 self.rows = rows;
1861 self.stats.rows_returned = self.rows.len() as u64;
1862 }
1863
1864 fn merge_stats(&mut self, stats: ExplorerStats) {
1865 self.stats.rows_examined = self.stats.rows_examined.saturating_add(stats.rows_examined);
1866 self.stats.rows_matched = self.stats.rows_matched.saturating_add(stats.rows_matched);
1867 self.stats.facet_rows_matched = self
1868 .stats
1869 .facet_rows_matched
1870 .saturating_add(stats.facet_rows_matched);
1871 self.stats.rows_returned = self.stats.rows_returned.saturating_add(stats.rows_returned);
1872 self.stats.rows_unsampled = self
1873 .stats
1874 .rows_unsampled
1875 .saturating_add(stats.rows_unsampled);
1876 self.stats.rows_estimated = self
1877 .stats
1878 .rows_estimated
1879 .saturating_add(stats.rows_estimated);
1880 self.stats.sampling_sampled = self
1881 .stats
1882 .sampling_sampled
1883 .saturating_add(stats.sampling_sampled);
1884 self.stats.sampling_unsampled = self
1885 .stats
1886 .sampling_unsampled
1887 .saturating_add(stats.sampling_unsampled);
1888 self.stats.sampling_estimated = self
1889 .stats
1890 .sampling_estimated
1891 .saturating_add(stats.sampling_estimated);
1892 if stats.last_realtime_usec > self.stats.last_realtime_usec {
1893 self.stats.last_realtime_usec = stats.last_realtime_usec;
1894 }
1895 if stats.max_source_realtime_delta_usec > self.stats.max_source_realtime_delta_usec {
1896 self.stats.max_source_realtime_delta_usec = stats.max_source_realtime_delta_usec;
1897 }
1898 self.stats.data_refs_seen = self
1899 .stats
1900 .data_refs_seen
1901 .saturating_add(stats.data_refs_seen);
1902 self.stats.data_refs_skipped = self
1903 .stats
1904 .data_refs_skipped
1905 .saturating_add(stats.data_refs_skipped);
1906 self.stats.data_payloads_loaded = self
1907 .stats
1908 .data_payloads_loaded
1909 .saturating_add(stats.data_payloads_loaded);
1910 self.stats.data_objects_classified = self
1911 .stats
1912 .data_objects_classified
1913 .saturating_add(stats.data_objects_classified);
1914 self.stats.data_cache_hits = self
1915 .stats
1916 .data_cache_hits
1917 .saturating_add(stats.data_cache_hits);
1918 self.stats.data_cache_misses = self
1919 .stats
1920 .data_cache_misses
1921 .saturating_add(stats.data_cache_misses);
1922 self.stats.payloads_decompressed = self
1923 .stats
1924 .payloads_decompressed
1925 .saturating_add(stats.payloads_decompressed);
1926 self.stats.fts_scans = self.stats.fts_scans.saturating_add(stats.fts_scans);
1927 self.stats.facet_updates = self.stats.facet_updates.saturating_add(stats.facet_updates);
1928 self.stats.histogram_updates = self
1929 .stats
1930 .histogram_updates
1931 .saturating_add(stats.histogram_updates);
1932 self.stats.returned_row_expansions = self
1933 .stats
1934 .returned_row_expansions
1935 .saturating_add(stats.returned_row_expansions);
1936 self.stats.early_stop_opportunities = self
1937 .stats
1938 .early_stop_opportunities
1939 .saturating_add(stats.early_stop_opportunities);
1940 self.stats.early_stops = self.stats.early_stops.saturating_add(stats.early_stops);
1941 }
1942
1943 fn add_zero_count_facet_values(
1944 &mut self,
1945 vocabulary: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1946 ) {
1947 for (field, values) in vocabulary {
1948 let target = self.facets.entry(field.clone()).or_default();
1949 for value in values.keys() {
1950 add_netdata_facet_count(target, value, 0);
1951 }
1952 }
1953 }
1954
1955 fn add_zero_count_facet_values_from_files(
1956 &mut self,
1957 fields: &[Vec<u8>],
1958 reader_options: ReaderOptions,
1959 ) {
1960 for path in &self.matched_paths {
1961 let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
1962 continue;
1963 };
1964 for field in fields {
1965 let Ok(field_name) = std::str::from_utf8(field) else {
1966 continue;
1967 };
1968 let Ok(values) = reader.query_unique(field_name) else {
1969 continue;
1970 };
1971 if values.is_empty() {
1972 continue;
1973 }
1974 let target = self.facets.entry(field.clone()).or_default();
1975 for value in values {
1976 add_netdata_facet_count(target, &value, 0);
1977 }
1978 }
1979 }
1980 }
1981
1982 fn add_zero_count_selected_filter_values(&mut self, request: &NetdataRequest) {
1983 let mut report_fields: HashSet<Vec<u8>> = request.facets.iter().cloned().collect();
1984 if let Some(histogram) = &request.histogram {
1985 report_fields.insert(histogram.as_bytes().to_vec());
1986 }
1987 for filter in &request.filters {
1988 if !report_fields.contains(&filter.field) {
1989 continue;
1990 }
1991 let target = self.facets.entry(filter.field.clone()).or_default();
1992 for value in &filter.values {
1993 add_netdata_facet_count(target, value, 0);
1994 }
1995 }
1996 }
1997
1998 fn reportable_facet_fields(&self, requested: &[Vec<u8>]) -> Vec<String> {
1999 string_fields(&self.reportable_facet_fields_bytes(requested))
2000 }
2001
2002 fn reportable_facet_fields_bytes(&self, requested: &[Vec<u8>]) -> Vec<Vec<u8>> {
2003 requested
2004 .iter()
2005 .filter(|field| {
2006 self.facets
2007 .get(*field)
2008 .is_some_and(facet_group_is_reportable)
2009 })
2010 .cloned()
2011 .collect()
2012 }
2013}
2014
2015fn netdata_function_error(status: u64, message: &str) -> Value {
2016 json!({
2017 "status": status,
2018 "errorMessage": message,
2019 })
2020}
2021
2022fn query_message(timed_out: bool, stats: &ExplorerStats) -> Value {
2023 if !timed_out && stats.rows_unsampled == 0 && stats.rows_estimated == 0 {
2024 return Value::String("OK".to_string());
2025 }
2026
2027 let total = stats
2028 .rows_examined
2029 .saturating_add(stats.rows_unsampled)
2030 .saturating_add(stats.rows_estimated)
2031 .max(1);
2032 let real_percent = stats.rows_examined as f64 * 100.0 / total as f64;
2033 let unsampled_percent = stats.rows_unsampled as f64 * 100.0 / total as f64;
2034 let estimated_percent = stats.rows_estimated as f64 * 100.0 / total as f64;
2035
2036 let mut title = String::new();
2037 let mut description = String::new();
2038 let mut status = "notice";
2039 if timed_out {
2040 title.push_str("Query timed-out, incomplete data. ");
2041 description.push_str(
2042 "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ",
2043 );
2044 status = "warning";
2045 }
2046 if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2047 title.push_str(&format!("{real_percent:.2}% real data"));
2048 description.push_str(&format!(
2049 "ACTUAL DATA: The filters counters reflect {real_percent:.2}% of the data. "
2050 ));
2051 }
2052 if stats.rows_unsampled != 0 {
2053 title.push_str(&format!(", {unsampled_percent:.2}% unsampled"));
2054 description.push_str(&format!(
2055 "UNSAMPLED DATA: {unsampled_percent:.2}% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
2056 ));
2057 }
2058 if stats.rows_estimated != 0 {
2059 title.push_str(&format!(", {estimated_percent:.2}% estimated"));
2060 description.push_str(&format!(
2061 "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by {estimated_percent:.2}%. "
2062 ));
2063 }
2064
2065 json!({
2066 "title": title,
2067 "status": status,
2068 "description": description,
2069 })
2070}
2071
2072fn merged_progress_stats(completed: &ExplorerStats, current: &ExplorerStats) -> ExplorerStats {
2073 let mut stats = completed.clone();
2074 stats.rows_examined = stats.rows_examined.saturating_add(current.rows_examined);
2075 stats.rows_matched = stats.rows_matched.saturating_add(current.rows_matched);
2076 stats.facet_rows_matched = stats
2077 .facet_rows_matched
2078 .saturating_add(current.facet_rows_matched);
2079 stats.rows_returned = stats.rows_returned.saturating_add(current.rows_returned);
2080 stats.rows_unsampled = stats.rows_unsampled.saturating_add(current.rows_unsampled);
2081 stats.rows_estimated = stats.rows_estimated.saturating_add(current.rows_estimated);
2082 stats.sampling_sampled = stats
2083 .sampling_sampled
2084 .saturating_add(current.sampling_sampled);
2085 stats.sampling_unsampled = stats
2086 .sampling_unsampled
2087 .saturating_add(current.sampling_unsampled);
2088 stats.sampling_estimated = stats
2089 .sampling_estimated
2090 .saturating_add(current.sampling_estimated);
2091 if current.last_realtime_usec > stats.last_realtime_usec {
2092 stats.last_realtime_usec = current.last_realtime_usec;
2093 }
2094 if current.max_source_realtime_delta_usec > stats.max_source_realtime_delta_usec {
2095 stats.max_source_realtime_delta_usec = current.max_source_realtime_delta_usec;
2096 }
2097 stats.data_refs_seen = stats.data_refs_seen.saturating_add(current.data_refs_seen);
2098 stats.data_refs_skipped = stats
2099 .data_refs_skipped
2100 .saturating_add(current.data_refs_skipped);
2101 stats.data_payloads_loaded = stats
2102 .data_payloads_loaded
2103 .saturating_add(current.data_payloads_loaded);
2104 stats.data_objects_classified = stats
2105 .data_objects_classified
2106 .saturating_add(current.data_objects_classified);
2107 stats.data_cache_hits = stats
2108 .data_cache_hits
2109 .saturating_add(current.data_cache_hits);
2110 stats.data_cache_misses = stats
2111 .data_cache_misses
2112 .saturating_add(current.data_cache_misses);
2113 stats.payloads_decompressed = stats
2114 .payloads_decompressed
2115 .saturating_add(current.payloads_decompressed);
2116 stats.fts_scans = stats.fts_scans.saturating_add(current.fts_scans);
2117 stats.facet_updates = stats.facet_updates.saturating_add(current.facet_updates);
2118 stats.histogram_updates = stats
2119 .histogram_updates
2120 .saturating_add(current.histogram_updates);
2121 stats.returned_row_expansions = stats
2122 .returned_row_expansions
2123 .saturating_add(current.returned_row_expansions);
2124 stats.early_stop_opportunities = stats
2125 .early_stop_opportunities
2126 .saturating_add(current.early_stop_opportunities);
2127 stats.early_stops = stats.early_stops.saturating_add(current.early_stops);
2128 stats
2129}
2130
2131struct Columns {
2132 order: Vec<String>,
2133 map: Map<String, Value>,
2134}
2135
2136struct QueryResponseArtifacts {
2137 reportable_facet_field_names: Vec<String>,
2138 columns: Columns,
2139 data: Vec<Value>,
2140 facets: Value,
2141 histogram: Option<Value>,
2142 message: Value,
2143 items: Value,
2144}
2145
2146fn base_query_response(
2147 request: &NetdataRequest,
2148 combined: &CombinedResult,
2149 artifacts: &QueryResponseArtifacts,
2150) -> Value {
2151 json!({
2152 "_request": &request.echo,
2153 "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
2154 "_journal_files": {
2155 "matched": combined.matched_files,
2156 "skipped": combined.skipped_files,
2157 "errors": &combined.file_errors,
2158 },
2159 "status": 200,
2160 "partial": combined.partial,
2161 "type": "table",
2162 "show_ids": true,
2163 "has_history": true,
2164 "pagination": {
2165 "enabled": true,
2166 "key": "anchor",
2167 "column": "timestamp",
2168 "units": "timestamp_usec",
2169 },
2170 "columns": &artifacts.columns.map,
2171 "data": &artifacts.data,
2172 "_stats": {
2173 "sdk_explorer": &combined.stats,
2174 },
2175 "expires": if request.data_only {
2176 unix_now_seconds().saturating_add(3600)
2177 } else {
2178 0
2179 }
2180 })
2181}
2182
2183fn response_items(request: &NetdataRequest, combined: &CombinedResult, returned: u64) -> Value {
2184 let unsampled = combined.stats.rows_unsampled;
2185 let estimated = combined.stats.rows_estimated;
2186 let fallback_rows_after_returned =
2187 response_fallback_rows_after_returned(&combined.stats, returned);
2188 let page_counters = combined
2189 .page_counters
2190 .unwrap_or_else(|| NetdataPageCounters {
2191 matched: combined.stats.rows_matched,
2192 before: 0,
2193 after: fallback_rows_after_returned,
2194 });
2195 json!({
2196 "evaluated": combined.stats.rows_examined.saturating_add(unsampled).saturating_add(estimated),
2197 "matched": page_counters.matched.saturating_add(unsampled).saturating_add(estimated),
2198 "unsampled": unsampled,
2199 "estimated": estimated,
2200 "returned": returned,
2201 "max_to_return": request.limit as u64,
2202 "before": page_counters.before,
2203 "after": page_counters.after,
2204 })
2205}
2206
2207fn response_fallback_rows_after_returned(stats: &ExplorerStats, returned: u64) -> u64 {
2208 let source_rows = if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2209 stats.rows_examined
2210 } else {
2211 stats.rows_matched
2212 };
2213 source_rows.saturating_sub(returned)
2214}
2215
2216fn add_last_modified_if_needed(
2217 object: &mut Map<String, Value>,
2218 request: &NetdataRequest,
2219 combined: &CombinedResult,
2220) {
2221 if !request.data_only || request.tail {
2222 object.insert(
2223 "last_modified".to_string(),
2224 Value::from(combined.stats.last_realtime_usec),
2225 );
2226 }
2227}
2228
2229fn add_sampling_if_needed(object: &mut Map<String, Value>, combined: &CombinedResult) {
2230 if combined.sampling_enabled {
2231 object.insert(
2232 "_sampling".to_string(),
2233 json!({
2234 "enabled": true,
2235 "sampled": combined.stats.sampling_sampled,
2236 "unsampled": combined.stats.sampling_unsampled,
2237 "estimated": combined.stats.sampling_estimated,
2238 }),
2239 );
2240 }
2241}
2242
2243fn add_analysis_outputs_if_needed(
2244 object: &mut Map<String, Value>,
2245 request: &NetdataRequest,
2246 artifacts: QueryResponseArtifacts,
2247) {
2248 if !request.data_only || request.delta {
2249 let (facets_key, histogram_key, items_key) = response_analysis_keys(request.data_only);
2250 object.insert(facets_key.to_string(), artifacts.facets);
2251 object.insert(
2252 histogram_key.to_string(),
2253 artifacts.histogram.unwrap_or(Value::Null),
2254 );
2255 object.insert(items_key.to_string(), artifacts.items);
2256 }
2257}
2258
2259fn response_analysis_keys(data_only: bool) -> (&'static str, &'static str, &'static str) {
2260 if data_only {
2261 ("facets_delta", "histogram_delta", "items_delta")
2262 } else {
2263 ("facets", "histogram", "items")
2264 }
2265}
2266
2267fn merge_histogram(
2268 target: &mut Option<ExplorerHistogram>,
2269 source: ExplorerHistogram,
2270) -> Result<()> {
2271 let Some(target) = target else {
2272 *target = Some(source);
2273 return Ok(());
2274 };
2275 if target.field != source.field
2276 || target.buckets.len() != source.buckets.len()
2277 || target
2278 .buckets
2279 .iter()
2280 .zip(source.buckets.iter())
2281 .any(|(target_bucket, source_bucket)| {
2282 target_bucket.start_realtime_usec != source_bucket.start_realtime_usec
2283 || target_bucket.end_realtime_usec != source_bucket.end_realtime_usec
2284 })
2285 {
2286 return Err(SdkError::Unsupported(
2287 "inconsistent Netdata histogram bucket shape",
2288 ));
2289 }
2290 for (index, source_bucket) in source.buckets.into_iter().enumerate() {
2291 let Some(target_bucket) = target.buckets.get_mut(index) else {
2292 return Err(SdkError::Unsupported(
2293 "inconsistent Netdata histogram bucket shape",
2294 ));
2295 };
2296 for (value, count) in source_bucket.values {
2297 *target_bucket.values.entry(value).or_default() += count;
2298 }
2299 }
2300 Ok(())
2301}
2302
2303fn facet_group_is_reportable(values: &BTreeMap<Vec<u8>, u64>) -> bool {
2304 values
2305 .iter()
2306 .any(|(value, _count)| !value.is_empty() && value.as_slice() != b"-")
2307}
2308
2309fn netdata_facet_value(value: &[u8]) -> &[u8] {
2310 if value.len() > NETDATA_FACET_MAX_VALUE_LENGTH {
2311 &value[..NETDATA_FACET_MAX_VALUE_LENGTH]
2312 } else {
2313 value
2314 }
2315}
2316
2317fn add_netdata_facet_count(target: &mut BTreeMap<Vec<u8>, u64>, value: &[u8], count: u64) {
2318 *target
2319 .entry(netdata_facet_value(value).to_vec())
2320 .or_default() += count;
2321}
2322
2323fn not_modified_before_scan_response(
2324 request: &NetdataRequest,
2325 selected: &SelectedJournalFiles,
2326) -> Option<Value> {
2327 if request.if_modified_since_usec != 0 && !selected.files_are_newer {
2328 Some(netdata_function_error(
2329 304,
2330 "No new data since the previous call.",
2331 ))
2332 } else {
2333 None
2334 }
2335}
2336
2337fn should_collect_unfiltered_facet_vocabulary(
2338 request: &NetdataRequest,
2339 combined: &CombinedResult,
2340) -> bool {
2341 !request.data_only && !combined.partial && !request.filters.is_empty()
2342}
2343
2344fn progress_context(file_index: usize, total_files: usize, started: Instant) -> ProgressContext {
2345 ProgressContext {
2346 current_file: file_index + 1,
2347 total_files,
2348 started,
2349 }
2350}
2351
2352fn emit_netdata_progress(
2353 options: &mut NetdataFunctionRunOptions<'_>,
2354 progress: NetdataFunctionProgress,
2355) {
2356 if let Some(callback) = options.progress_callback.as_deref_mut() {
2357 callback(progress);
2358 }
2359}
2360
2361fn emit_progress_for_combined(
2362 options: &mut NetdataFunctionRunOptions<'_>,
2363 combined: &CombinedResult,
2364 context: ProgressContext,
2365) {
2366 emit_netdata_progress(
2367 options,
2368 NetdataFunctionProgress {
2369 current_file: context.current_file,
2370 total_files: context.total_files,
2371 matched_files: combined.matched_files,
2372 skipped_files: combined.skipped_files,
2373 stats: combined.stats.clone(),
2374 elapsed: context.started.elapsed(),
2375 },
2376 );
2377}
2378
2379fn emit_explorer_progress(
2380 options: &mut NetdataFunctionRunOptions<'_>,
2381 combined: &CombinedResult,
2382 progress: ExplorerProgress,
2383 context: ProgressContext,
2384) {
2385 let stats = merged_progress_stats(&combined.stats, &progress.stats);
2386 if let Some(callback) = options.progress_callback.as_deref_mut() {
2387 callback(NetdataFunctionProgress {
2388 current_file: context.current_file,
2389 total_files: context.total_files,
2390 matched_files: combined.matched_files,
2391 skipped_files: combined.skipped_files,
2392 stats,
2393 elapsed: context.started.elapsed(),
2394 });
2395 }
2396}
2397
2398fn request_cancelled(options: &NetdataFunctionRunOptions<'_>) -> bool {
2399 options
2400 .cancellation_callback
2401 .is_some_and(|is_cancelled| is_cancelled())
2402}
2403
2404fn should_stop_before_file(
2405 combined: &mut CombinedResult,
2406 deadline: Option<Instant>,
2407 options: &NetdataFunctionRunOptions<'_>,
2408) -> bool {
2409 if request_cancelled(options) {
2410 combined.partial = true;
2411 combined.cancelled = true;
2412 return true;
2413 }
2414 if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
2415 combined.partial = true;
2416 combined.timed_out = true;
2417 return true;
2418 }
2419 false
2420}
2421
2422fn collect_column_fields_for_file(
2423 reader: &mut FileReader,
2424 request: &NetdataRequest,
2425 path: &Path,
2426 combined: &mut CombinedResult,
2427) {
2428 if request.data_only {
2429 return;
2430 }
2431 match reader.enumerate_fields_indexed() {
2432 Ok(fields) => combined.add_column_fields(fields),
2433 Err(err) => combined.file_errors.push(format!(
2434 "{}: FIELD index enumeration failed: {err}",
2435 path.display()
2436 )),
2437 }
2438}
2439
2440fn record_explore_result(
2441 result: Result<(ExplorerResult, Option<ExplorerStopReason>)>,
2442 path: &Path,
2443 combined: &mut CombinedResult,
2444) -> Option<(ExplorerResult, Option<ExplorerStopReason>)> {
2445 match result {
2446 Ok(result) => Some(result),
2447 Err(err) => {
2448 combined.skipped_files = combined.skipped_files.saturating_add(1);
2449 combined
2450 .file_errors
2451 .push(format!("{}: {err}", path.display()));
2452 None
2453 }
2454 }
2455}
2456
2457fn delta_scan_can_stop(
2458 request: &NetdataRequest,
2459 page_window: &RefCell<NetdataPageWindow>,
2460 realtime_usec: u64,
2461 rows_matched: u64,
2462 realtime_delta_usec: u64,
2463) -> bool {
2464 let mut page_window = page_window.borrow_mut();
2465 page_window.observe(realtime_usec);
2466 request.data_only
2467 && request.delta
2468 && rows_matched % DATA_ONLY_CHECK_EVERY_ROWS == 0
2469 && page_window.can_stop_delta_file(realtime_usec, realtime_delta_usec)
2470}
2471
2472#[allow(clippy::too_many_arguments)]
2473fn finish_explored_file(
2474 options: &mut NetdataFunctionRunOptions<'_>,
2475 request: &NetdataRequest,
2476 file: &SelectedJournalFile,
2477 query: &ExplorerQuery,
2478 result: ExplorerResult,
2479 stop_reason: Option<ExplorerStopReason>,
2480 combined: &mut CombinedResult,
2481 files: &[SelectedJournalFile],
2482 file_index: usize,
2483 progress: ProgressContext,
2484) -> Result<bool> {
2485 update_learned_realtime_delta(options, &file.path, &file.order, &result.stats);
2486 combined.merge(&file.path, result, query.direction, request.limit)?;
2487 emit_progress_for_combined(options, combined, progress);
2488 if request_cancelled(options) {
2489 combined.partial = true;
2490 combined.cancelled = true;
2491 return Ok(true);
2492 }
2493 if let Some(reason) = stop_reason {
2494 combined.partial = true;
2495 match reason {
2496 ExplorerStopReason::TimedOut => combined.timed_out = true,
2497 ExplorerStopReason::Cancelled => combined.cancelled = true,
2498 }
2499 return Ok(true);
2500 }
2501 Ok(request.data_only
2502 && !request.delta
2503 && !request.tail
2504 && combined.rows.len() >= request.limit
2505 && remaining_files_cannot_affect_data_page(combined, request, files, file_index + 1))
2506}
2507
2508fn file_metadata(
2509 options: &NetdataFunctionRunOptions<'_>,
2510 path: &Path,
2511) -> Option<NetdataJournalFileMetadata> {
2512 options
2513 .state
2514 .as_deref()
2515 .and_then(|state| state.file_metadata(path))
2516}
2517
2518fn update_learned_realtime_delta(
2519 options: &mut NetdataFunctionRunOptions<'_>,
2520 path: &Path,
2521 order: &JournalFileOrderInfo,
2522 stats: &ExplorerStats,
2523) {
2524 let learned_delta = stats.max_source_realtime_delta_usec;
2525 if learned_delta == 0 || learned_delta <= order.journal_vs_realtime_delta_usec {
2526 return;
2527 }
2528 let learned_delta = normalize_journal_vs_realtime_delta_usec(learned_delta);
2529 if learned_delta <= order.journal_vs_realtime_delta_usec {
2530 return;
2531 }
2532 if let Some(state) = options.state.as_deref_mut() {
2533 state.update_file_journal_vs_realtime_delta_usec(path, learned_delta);
2534 }
2535}
2536
2537fn normalize_journal_vs_realtime_delta_usec(delta_usec: u64) -> u64 {
2538 delta_usec
2539 .max(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC)
2540 .min(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC)
2541}
2542
2543#[cfg(test)]
2544fn file_may_overlap_request(header: crate::FileHeader, request: &NetdataRequest) -> bool {
2545 if header.tail_entry_realtime == 0 {
2546 return true;
2547 }
2548
2549 let first = header
2550 .head_entry_realtime
2551 .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2552 let last = header
2553 .tail_entry_realtime
2554 .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2555
2556 if request
2557 .after_realtime_usec
2558 .is_some_and(|after| last < after)
2559 {
2560 return false;
2561 }
2562 if request
2563 .before_realtime_usec
2564 .is_some_and(|before| first > before)
2565 {
2566 return false;
2567 }
2568
2569 true
2570}
2571
2572#[derive(Debug)]
2573struct SelectedJournalFile {
2574 path: PathBuf,
2575 order: JournalFileOrderInfo,
2576}
2577
2578#[derive(Debug, Default)]
2579struct SelectedJournalFiles {
2580 files: Vec<SelectedJournalFile>,
2581 files_are_newer: bool,
2582}
2583
2584fn select_journal_files_for_request(
2585 paths: Vec<PathBuf>,
2586 request: &NetdataRequest,
2587 reader_options: ReaderOptions,
2588 options: &NetdataFunctionRunOptions<'_>,
2589) -> SelectedJournalFiles {
2590 let mut selected = Vec::new();
2591 for path in paths {
2592 let metadata = file_metadata(options, &path);
2593 if !request.matches_source(&path, metadata.as_ref()) {
2594 continue;
2595 }
2596 let order = journal_file_order_info(&path, reader_options, metadata.as_ref());
2597 if !journal_file_order_may_overlap_request(&order, request) {
2598 continue;
2599 }
2600 selected.push(SelectedJournalFile { path, order });
2601 }
2602 selected.sort_by(|left, right| {
2603 compare_journal_file_order(&left.order, &right.order, request.direction)
2604 .then_with(|| left.path.cmp(&right.path))
2605 });
2606 let files_are_newer = selected
2607 .iter()
2608 .any(|file| file.order.msg_last_realtime_usec > request.if_modified_since_usec);
2609 SelectedJournalFiles {
2610 files: selected,
2611 files_are_newer,
2612 }
2613}
2614
2615fn remaining_files_cannot_affect_data_page(
2616 combined: &CombinedResult,
2617 request: &NetdataRequest,
2618 files: &[SelectedJournalFile],
2619 next_file_index: usize,
2620) -> bool {
2621 let Some(next) = files.get(next_file_index) else {
2622 return true;
2623 };
2624 let slack = next.order.journal_vs_realtime_delta_usec;
2625 match request.direction {
2626 Direction::Backward => {
2627 let Some(oldest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).min()
2628 else {
2629 return false;
2630 };
2631 next.order.msg_last_realtime_usec < oldest_retained.saturating_sub(slack)
2632 }
2633 Direction::Forward => {
2634 let Some(newest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).max()
2635 else {
2636 return false;
2637 };
2638 next.order.msg_first_realtime_usec > newest_retained.saturating_add(slack)
2639 }
2640 }
2641}
2642
2643fn journal_file_order_may_overlap_request(
2644 info: &JournalFileOrderInfo,
2645 request: &NetdataRequest,
2646) -> bool {
2647 if info.msg_last_realtime_usec == 0 {
2648 return true;
2649 }
2650
2651 let first = info
2652 .msg_first_realtime_usec
2653 .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2654 let last = info
2655 .msg_last_realtime_usec
2656 .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2657
2658 if request
2659 .after_realtime_usec
2660 .is_some_and(|after| last < after)
2661 {
2662 return false;
2663 }
2664 if request
2665 .before_realtime_usec
2666 .is_some_and(|before| first > before)
2667 {
2668 return false;
2669 }
2670
2671 true
2672}
2673
2674fn collect_boot_first_realtime(
2675 paths: &[PathBuf],
2676 reader_options: ReaderOptions,
2677 needed_boot_ids: &BTreeSet<Vec<u8>>,
2678) -> BTreeMap<Vec<u8>, u64> {
2679 let mut out = BTreeMap::new();
2680 if needed_boot_ids.is_empty() {
2681 return out;
2682 }
2683 for path in paths {
2684 let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2685 continue;
2686 };
2687 let Ok(boot_ids) = reader.query_unique("_BOOT_ID") else {
2688 continue;
2689 };
2690 for boot_id in boot_ids {
2691 if !needed_boot_ids.contains(&boot_id) {
2692 continue;
2693 }
2694 let mut match_payload = b"_BOOT_ID=".to_vec();
2695 match_payload.extend_from_slice(&boot_id);
2696 reader.flush_matches();
2697 reader.add_match(&match_payload);
2698 reader.seek_head();
2699 if !reader.next().unwrap_or(false) {
2700 continue;
2701 }
2702 if let Ok(realtime) = reader.get_realtime_usec() {
2703 record_boot_first_realtime(&mut out, boot_id, realtime);
2704 }
2705 }
2706 reader.flush_matches();
2707 }
2708 out
2709}
2710
2711fn response_boot_ids(
2712 column_order: &[String],
2713 rows: &[LocatedRow],
2714 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
2715 histogram: Option<&ExplorerHistogram>,
2716) -> BTreeSet<Vec<u8>> {
2717 let mut boot_ids = BTreeSet::new();
2718 let row_needs_boot_id = column_order.iter().any(|field| field == "_BOOT_ID");
2719 if row_needs_boot_id {
2720 for row in rows {
2721 if let Some(values) = row_fields(row).get("_BOOT_ID") {
2722 boot_ids.extend(values.iter().cloned());
2723 }
2724 }
2725 }
2726 if let Some(values) = facets.get(b"_BOOT_ID".as_slice()) {
2727 boot_ids.extend(
2728 values
2729 .keys()
2730 .filter(|value| !value.is_empty() && value.as_slice() != b"-")
2731 .cloned(),
2732 );
2733 }
2734 if let Some(histogram) = histogram.filter(|histogram| histogram.field == b"_BOOT_ID") {
2735 for bucket in &histogram.buckets {
2736 boot_ids.extend(
2737 bucket
2738 .values
2739 .keys()
2740 .filter(|value| !value.is_empty() && value.as_slice() != b"-")
2741 .cloned(),
2742 );
2743 }
2744 }
2745 boot_ids
2746}
2747
2748fn record_boot_first_realtime(
2749 target: &mut BTreeMap<Vec<u8>, u64>,
2750 boot_id: Vec<u8>,
2751 realtime_usec: u64,
2752) {
2753 let existing = target.entry(boot_id).or_insert(realtime_usec);
2754 if realtime_usec < *existing {
2755 *existing = realtime_usec;
2756 }
2757}
2758
2759fn row_fields(row: &LocatedRow) -> BTreeMap<String, Vec<Vec<u8>>> {
2760 let mut fields = BTreeMap::new();
2761 for payload in &row.row.payloads {
2762 let Some((field, value)) = split_payload(payload) else {
2763 continue;
2764 };
2765 fields
2766 .entry(String::from_utf8_lossy(field).into_owned())
2767 .or_insert_with(Vec::new)
2768 .push(value.to_vec());
2769 }
2770 fields.insert(
2771 "ND_JOURNAL_FILE".to_string(),
2772 vec![row.file_path.display().to_string().into_bytes()],
2773 );
2774 if !fields.contains_key("ND_JOURNAL_PROCESS") {
2775 let process = dynamic_process_name(&fields);
2776 if !process.is_empty() {
2777 fields.insert("ND_JOURNAL_PROCESS".to_string(), vec![process.into_bytes()]);
2778 }
2779 }
2780 fields
2781}
2782
2783fn dynamic_process_name(fields: &BTreeMap<String, Vec<Vec<u8>>>) -> String {
2784 let base = first_value(fields, "CONTAINER_NAME")
2785 .or_else(|| first_value(fields, "SYSLOG_IDENTIFIER"))
2786 .or_else(|| first_value(fields, "_COMM"))
2787 .map(|value| String::from_utf8_lossy(value).into_owned())
2788 .unwrap_or_default();
2789 if base.is_empty() {
2790 return "-".to_string();
2791 }
2792 let pid = first_value(fields, "_PID").map(|value| String::from_utf8_lossy(value).into_owned());
2793 match pid {
2794 Some(pid) if !pid.is_empty() => format!("{base}[{pid}]"),
2795 _ => base,
2796 }
2797}
2798
2799fn make_row_timestamps_unique(rows: &mut [LocatedRow], direction: Direction) {
2800 let mut last_from = 0u64;
2801 let mut last_to = 0u64;
2802 let mut initialized = false;
2803 for row in rows {
2804 let timestamp = row.row.realtime_usec;
2805 if initialized && timestamp >= last_from && timestamp <= last_to {
2806 match direction {
2807 Direction::Backward => {
2808 last_from = last_from.saturating_sub(1);
2809 row.row.realtime_usec = last_from;
2810 }
2811 Direction::Forward => {
2812 last_to = last_to.saturating_add(1);
2813 row.row.realtime_usec = last_to;
2814 }
2815 }
2816 } else {
2817 last_from = timestamp;
2818 last_to = timestamp;
2819 initialized = true;
2820 }
2821 }
2822}
2823
2824fn first_value<'a>(fields: &'a BTreeMap<String, Vec<Vec<u8>>>, field: &str) -> Option<&'a [u8]> {
2825 fields
2826 .get(field)
2827 .and_then(|values| values.first())
2828 .map(Vec::as_slice)
2829}
2830
2831fn split_payload(payload: &[u8]) -> Option<(&[u8], &[u8])> {
2832 let split = payload.iter().position(|byte| *byte == b'=')?;
2833 Some((&payload[..split], &payload[split + 1..]))
2834}
2835
2836fn column_metadata(key: &str, index: usize) -> Value {
2837 let (visible, filter, full_width) = match key {
2838 "timestamp" => (true, "range", false),
2839 "rowOptions" => (false, "none", false),
2840 "_HOSTNAME" => (true, "facet", false),
2841 "ND_JOURNAL_PROCESS" | "MESSAGE" => (true, "none", key == "MESSAGE"),
2842 "ND_JOURNAL_FILE" | "_SOURCE_REALTIME_TIMESTAMP" => (false, "none", false),
2843 _ if systemd_column_is_facet(key) => (false, "facet", false),
2844 _ => (false, "none", false),
2845 };
2846 let column_type = if key == "timestamp" {
2847 "timestamp"
2848 } else if key == "rowOptions" {
2849 "none"
2850 } else {
2851 "string"
2852 };
2853 let visualization = if key == "rowOptions" {
2854 "rowOptions"
2855 } else {
2856 "value"
2857 };
2858 let mut metadata = json!({
2859 "index": index,
2860 "unique_key": key == "timestamp",
2861 "name": if key == "timestamp" { "Timestamp" } else { key },
2862 "visible": visible,
2863 "type": column_type,
2864 "visualization": visualization,
2865 "value_options": {
2866 "transform": if key == "timestamp" { "datetime_usec" } else { "none" },
2867 "decimal_points": 0,
2868 "default_value": if key == "timestamp" || key == "rowOptions" {
2869 Value::Null
2870 } else {
2871 Value::String("-".to_string())
2872 },
2873 },
2874 "sort": "ascending",
2875 "sortable": false,
2876 "sticky": false,
2877 "summary": "count",
2878 "filter": filter,
2879 "full_width": full_width,
2880 "wrap": key != "rowOptions",
2881 "default_expanded_filter": matches!(key, "PRIORITY" | "SYSLOG_FACILITY" | "MESSAGE_ID"),
2882 });
2883 if key == "rowOptions" {
2884 if let Some(object) = metadata.as_object_mut() {
2885 object.insert("dummy".to_string(), Value::Bool(true));
2886 }
2887 }
2888 metadata
2889}
2890
2891fn systemd_column_is_facet(key: &str) -> bool {
2892 if key == "MESSAGE_ID" {
2893 return true;
2894 }
2895 if key.contains("MESSAGE") || key.contains("TIMESTAMP") || key.starts_with("__") {
2896 return false;
2897 }
2898 true
2899}
2900
2901fn sort_facet_options(field: &str, options: &mut [Value]) {
2902 options.sort_by(|left, right| {
2903 let left_id = left.get("id").and_then(Value::as_str).unwrap_or_default();
2904 let right_id = right.get("id").and_then(Value::as_str).unwrap_or_default();
2905 if field == "PRIORITY" {
2906 return parse_priority(left_id).cmp(&parse_priority(right_id));
2907 }
2908 let left_count = left
2909 .get("count")
2910 .and_then(Value::as_u64)
2911 .unwrap_or_default();
2912 let right_count = right
2913 .get("count")
2914 .and_then(Value::as_u64)
2915 .unwrap_or_default();
2916 right_count
2917 .cmp(&left_count)
2918 .then_with(|| left_id.cmp(right_id))
2919 });
2920}
2921
2922fn parse_fts_query_patterns(query: &str) -> (Vec<ExplorerFtsPattern>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
2923 let bytes = query.as_bytes();
2924 let mut index = 0usize;
2925 let mut terms = Vec::new();
2926 let mut positives = Vec::new();
2927 let mut negatives = Vec::new();
2928
2929 while let Some((pattern, negative)) = next_fts_pattern(bytes, &mut index) {
2930 push_fts_pattern(
2931 pattern,
2932 negative,
2933 &mut terms,
2934 &mut positives,
2935 &mut negatives,
2936 );
2937 }
2938
2939 (terms, positives, negatives)
2940}
2941
2942fn next_fts_pattern(bytes: &[u8], index: &mut usize) -> Option<(Vec<u8>, bool)> {
2943 while *index < bytes.len() {
2944 skip_fts_separators(bytes, index);
2945 let negative = consume_fts_negative_marker(bytes, index);
2946 let pattern = read_fts_pattern(bytes, index);
2947 if !pattern.is_empty() {
2948 return Some((pattern, negative));
2949 }
2950 }
2951 None
2952}
2953
2954fn skip_fts_separators(bytes: &[u8], index: &mut usize) {
2955 while *index < bytes.len() && bytes[*index] == b'|' {
2956 *index += 1;
2957 }
2958}
2959
2960fn consume_fts_negative_marker(bytes: &[u8], index: &mut usize) -> bool {
2961 if bytes.get(*index) == Some(&b'!') {
2962 *index += 1;
2963 true
2964 } else {
2965 false
2966 }
2967}
2968
2969fn read_fts_pattern(bytes: &[u8], index: &mut usize) -> Vec<u8> {
2970 let mut pattern = Vec::new();
2971 let mut escaped = false;
2972 while *index < bytes.len() {
2973 let byte = bytes[*index];
2974 *index += 1;
2975 if byte == b'\\' && !escaped {
2976 escaped = true;
2977 continue;
2978 }
2979 if byte == b'|' && !escaped {
2980 break;
2981 }
2982 pattern.push(byte);
2983 escaped = false;
2984 }
2985 pattern
2986}
2987
2988fn push_fts_pattern(
2989 pattern: Vec<u8>,
2990 negative: bool,
2991 terms: &mut Vec<ExplorerFtsPattern>,
2992 positives: &mut Vec<Vec<u8>>,
2993 negatives: &mut Vec<Vec<u8>>,
2994) {
2995 terms.push(ExplorerFtsPattern::substring(pattern.clone(), negative));
2996 if negative {
2997 negatives.push(pattern);
2998 } else {
2999 positives.push(pattern);
3000 }
3001}
3002
3003fn parse_filters(value: Option<&Value>) -> Vec<ExplorerFilter> {
3004 let Some(Value::Object(selections)) = value else {
3005 return Vec::new();
3006 };
3007 let mut filters = Vec::new();
3008 for (field, values) in selections {
3009 if matches!(field.as_str(), "query" | "source" | "__logs_sources") {
3010 continue;
3011 }
3012 let Some(values) = parse_string_array(Some(values)) else {
3013 continue;
3014 };
3015 filters.push(ExplorerFilter::new(
3016 field.as_bytes().to_vec(),
3017 values
3018 .into_iter()
3019 .map(|value| normalize_filter_value(field, &value)),
3020 ));
3021 }
3022 filters
3023}
3024
3025#[derive(Debug, Clone)]
3026struct SourceSelection {
3027 source_type: u64,
3028 exact_sources: Vec<String>,
3029}
3030
3031fn parse_source_selection(value: Option<&Value>) -> SourceSelection {
3032 let mut selection = SourceSelection {
3033 source_type: SOURCE_TYPE_ALL,
3034 exact_sources: Vec::new(),
3035 };
3036 let Some(Value::Object(selections)) = value else {
3037 return selection;
3038 };
3039 let Some(values) = parse_string_array(selections.get("__logs_sources")) else {
3040 return selection;
3041 };
3042 selection.source_type = 0;
3043 for value in values {
3044 match source_type_for_name(&value) {
3045 Some(source_type) => selection.source_type |= source_type,
3046 None => selection.exact_sources.push(value),
3047 }
3048 }
3049 selection
3050}
3051
3052fn source_type_for_name(value: &str) -> Option<u64> {
3053 match value {
3054 "all" => Some(SOURCE_TYPE_ALL),
3055 "all-local-logs" => Some(SOURCE_TYPE_LOCAL_ALL),
3056 "all-remote-systems" => Some(SOURCE_TYPE_REMOTE_ALL),
3057 "all-local-system-logs" => Some(SOURCE_TYPE_LOCAL_SYSTEM),
3058 "all-local-user-logs" => Some(SOURCE_TYPE_LOCAL_USER),
3059 "all-local-namespaces" => Some(SOURCE_TYPE_LOCAL_NAMESPACE),
3060 "all-uncategorized" => Some(SOURCE_TYPE_LOCAL_OTHER),
3061 _ => None,
3062 }
3063}
3064
3065fn journal_file_source_type(path: &Path) -> u64 {
3066 let text = path.to_string_lossy();
3067 let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
3068 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER;
3069 };
3070 if text.contains("/remote/") {
3071 return SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL;
3072 }
3073 if local_namespace_source_name(path).is_some() {
3074 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE;
3075 }
3076 if name.starts_with("system") {
3077 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM;
3078 }
3079 if name.starts_with("user") {
3080 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER;
3081 }
3082 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
3083}
3084
3085fn local_namespace_source_name(path: &Path) -> Option<String> {
3086 let parent = path.parent()?.file_name()?.to_str()?;
3087 let (_, namespace) = parent.rsplit_once('.')?;
3088 (!namespace.is_empty()).then(|| format!("namespace-{namespace}"))
3089}
3090
3091fn journal_file_exact_source_name(path: &Path) -> Option<String> {
3092 let text = path.to_string_lossy();
3093 if text.contains("/remote/") {
3094 let name = path.file_name()?.to_str()?;
3095 let source = name
3096 .split_once('@')
3097 .map(|(prefix, _)| prefix)
3098 .unwrap_or_else(|| {
3099 name.strip_suffix(".journal~.zst")
3100 .or_else(|| name.strip_suffix(".journal.zst"))
3101 .or_else(|| name.strip_suffix(".journal~"))
3102 .or_else(|| name.strip_suffix(".journal"))
3103 .unwrap_or(name)
3104 });
3105 return source.starts_with("remote-").then(|| source.to_string());
3106 }
3107 local_namespace_source_name(path)
3108}
3109
3110fn normalize_filter_value(field: &str, value: &str) -> Vec<u8> {
3111 if field == "PRIORITY" {
3112 if let Some(priority) = priority_name_to_number(value) {
3113 return priority.as_bytes().to_vec();
3114 }
3115 }
3116 value.as_bytes().to_vec()
3117}
3118
3119fn parse_string_array(value: Option<&Value>) -> Option<Vec<String>> {
3120 let Value::Array(items) = value? else {
3121 return None;
3122 };
3123 Some(
3124 items
3125 .iter()
3126 .filter_map(Value::as_str)
3127 .map(ToOwned::to_owned)
3128 .collect(),
3129 )
3130}
3131
3132fn request_direction(object: &Map<String, Value>) -> Direction {
3133 match get_str(object, "direction").unwrap_or("backward") {
3134 "forward" | "forwards" | "next" => Direction::Forward,
3135 _ => Direction::Backward,
3136 }
3137}
3138
3139fn request_delta(data_only: bool, object: &Map<String, Value>) -> bool {
3140 data_only && get_bool(object, "delta").unwrap_or(false)
3141}
3142
3143fn request_tail(data_only: bool, if_modified_since_usec: u64, object: &Map<String, Value>) -> bool {
3144 data_only && if_modified_since_usec != 0 && get_bool(object, "tail").unwrap_or(false)
3145}
3146
3147fn request_anchor_and_direction(
3148 object: &Map<String, Value>,
3149 tail: bool,
3150 direction: Direction,
3151 after_realtime_usec: Option<u64>,
3152 before_realtime_usec: Option<u64>,
3153) -> (ExplorerAnchor, Direction) {
3154 let anchor = get_u64(object, "anchor")
3155 .map(normalize_timestamp_to_usec)
3156 .map(ExplorerAnchor::Realtime)
3157 .unwrap_or(ExplorerAnchor::Auto);
3158 if tail && matches!(anchor, ExplorerAnchor::Realtime(_)) {
3159 return (anchor, Direction::Backward);
3160 }
3161 if anchor_outside_window(anchor, after_realtime_usec, before_realtime_usec) {
3162 (ExplorerAnchor::Auto, Direction::Backward)
3163 } else {
3164 (anchor, direction)
3165 }
3166}
3167
3168fn anchor_outside_window(
3169 anchor: ExplorerAnchor,
3170 after_realtime_usec: Option<u64>,
3171 before_realtime_usec: Option<u64>,
3172) -> bool {
3173 let ExplorerAnchor::Realtime(anchor_usec) = anchor else {
3174 return false;
3175 };
3176 after_realtime_usec.is_some_and(|after| anchor_usec < after)
3177 || before_realtime_usec.is_some_and(|before| anchor_usec > before)
3178}
3179
3180fn request_limit(object: &Map<String, Value>) -> usize {
3181 get_u64(object, "last")
3182 .filter(|value| *value != 0)
3183 .map(|value| value as usize)
3184 .unwrap_or(DEFAULT_ITEMS_TO_RETURN)
3185}
3186
3187fn request_facets(
3188 requested_facets: &Option<Vec<String>>,
3189 config: &NetdataFunctionConfig,
3190) -> Vec<Vec<u8>> {
3191 requested_facets
3192 .clone()
3193 .unwrap_or_else(|| config.default_facets.clone())
3194 .into_iter()
3195 .map(Vec::from)
3196 .collect()
3197}
3198
3199fn request_histogram(object: &Map<String, Value>) -> Option<String> {
3200 get_str(object, "histogram")
3201 .filter(|histogram| !histogram.is_empty())
3202 .map(ToOwned::to_owned)
3203}
3204
3205fn request_histogram_or_default(
3206 requested_histogram: &Option<String>,
3207 config: &NetdataFunctionConfig,
3208) -> Option<String> {
3209 requested_histogram
3210 .clone()
3211 .or_else(|| config.default_histogram.clone())
3212}
3213
3214fn request_query(object: &Map<String, Value>) -> Option<String> {
3215 get_str(object, "query")
3216 .filter(|query| !query.is_empty())
3217 .map(ToOwned::to_owned)
3218}
3219
3220fn get_bool(object: &Map<String, Value>, key: &str) -> Option<bool> {
3221 object.get(key).and_then(Value::as_bool)
3222}
3223
3224fn get_i64(object: &Map<String, Value>, key: &str) -> Option<i64> {
3225 object.get(key).and_then(Value::as_i64)
3226}
3227
3228fn get_u64(object: &Map<String, Value>, key: &str) -> Option<u64> {
3229 object.get(key).and_then(Value::as_u64)
3230}
3231
3232fn get_str<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
3233 object.get(key).and_then(Value::as_str)
3234}
3235
3236fn normalize_time_window(
3237 now_seconds: i64,
3238 after: Option<i64>,
3239 before: Option<i64>,
3240) -> (Option<u64>, Option<u64>) {
3241 let mut after = after.unwrap_or(0);
3242 let mut before = before.unwrap_or(0);
3243
3244 if after == 0 && before == 0 {
3245 before = now_seconds;
3246 after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3247 } else {
3248 (after, before) = relative_window_to_absolute(now_seconds, after, before);
3249 }
3250
3251 if after > before {
3252 std::mem::swap(&mut after, &mut before);
3253 }
3254 if after == before {
3255 after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3256 }
3257
3258 (
3259 Some(normalize_timestamp_to_usec_with_rounding(
3260 after.max(0) as u64,
3261 false,
3262 )),
3263 Some(normalize_timestamp_to_usec_with_rounding(
3264 before.max(0) as u64,
3265 true,
3266 )),
3267 )
3268}
3269
3270fn relative_window_to_absolute(now_seconds: i64, after: i64, before: i64) -> (i64, i64) {
3271 let mut after = after;
3272 let mut before = before;
3273
3274 if before.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3275 if before > 0 {
3276 before = -before;
3277 }
3278 before = now_seconds.saturating_add(before);
3279 }
3280
3281 if after.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3282 if after > 0 {
3283 after = -after;
3284 }
3285 if after == 0 {
3286 after = -NETDATA_MISSING_AFTER_RELATIVE_SECONDS;
3287 }
3288 after = before.saturating_add(after).saturating_add(1);
3289 }
3290
3291 if after > before {
3292 std::mem::swap(&mut after, &mut before);
3293 }
3294
3295 if before > now_seconds {
3296 let delta = before.saturating_sub(now_seconds);
3297 before = before.saturating_sub(delta);
3298 after = after.saturating_sub(delta);
3299 }
3300
3301 (after, before)
3302}
3303
3304struct RequestEchoInput<'a> {
3305 info: bool,
3306 after_realtime_usec: Option<u64>,
3307 before_realtime_usec: Option<u64>,
3308 if_modified_since_usec: u64,
3309 anchor: ExplorerAnchor,
3310 direction: Direction,
3311 limit: usize,
3312 data_only: bool,
3313 delta: bool,
3314 tail: bool,
3315 sampling: u64,
3316 source_type: u64,
3317 requested_facets: Option<&'a [String]>,
3318 selections: Option<&'a Value>,
3319 histogram: Option<&'a str>,
3320 query: Option<&'a str>,
3321}
3322
3323fn normalized_request_echo(input: &RequestEchoInput<'_>) -> Value {
3324 let anchor_usec = match input.anchor {
3325 ExplorerAnchor::Realtime(usec) => usec,
3326 ExplorerAnchor::Auto | ExplorerAnchor::Head | ExplorerAnchor::Tail => 0,
3327 };
3328 let mut out = json!({
3329 "info": input.info,
3330 "slice": true,
3334 "data_only": input.data_only,
3335 "delta": input.delta,
3336 "tail": input.tail,
3337 "sampling": input.sampling,
3338 "source_type": input.source_type,
3339 "after": input.after_realtime_usec.unwrap_or(0) / 1_000_000,
3340 "before": input.before_realtime_usec.unwrap_or(0) / 1_000_000,
3341 "if_modified_since": input.if_modified_since_usec,
3342 "anchor": anchor_usec,
3343 "direction": match input.direction {
3344 Direction::Forward => "forward",
3345 Direction::Backward => "backward",
3346 },
3347 "last": input.limit,
3348 "query": input.query,
3349 "histogram": input.histogram,
3350 });
3351 if let Some(facets) = input.requested_facets {
3352 if let Some(object) = out.as_object_mut() {
3353 object.insert(
3354 "facets".to_string(),
3355 facets
3356 .iter()
3357 .map(|field| Value::String(field.clone()))
3358 .collect(),
3359 );
3360 }
3361 }
3362 if let Some(Value::Object(selections)) = input.selections {
3363 let mut selections = selections.clone();
3364 if let Some(Value::Array(sources)) = selections.get_mut("__logs_sources") {
3365 for source in sources {
3366 *source = Value::Null;
3367 }
3368 }
3369 if let Some(object) = out.as_object_mut() {
3370 object.insert("selections".to_string(), Value::Object(selections));
3371 }
3372 }
3373 out
3374}
3375
3376fn normalize_timestamp_to_usec(value: u64) -> u64 {
3377 normalize_timestamp_to_usec_with_rounding(value, false)
3378}
3379
3380fn normalize_timestamp_to_usec_with_rounding(value: u64, end_of_second: bool) -> u64 {
3381 if value >= 1_000_000_000_000 {
3382 value
3383 } else if end_of_second {
3384 value.saturating_mul(1_000_000).saturating_add(999_999)
3385 } else {
3386 value.saturating_mul(1_000_000)
3387 }
3388}
3389
3390fn unix_now_seconds() -> i64 {
3391 SystemTime::now()
3392 .duration_since(UNIX_EPOCH)
3393 .map(|duration| duration.as_secs() as i64)
3394 .unwrap_or_default()
3395}
3396
3397fn human_binary_size(bytes: u64) -> String {
3398 const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
3399 let mut value = bytes as f64;
3400 let mut unit = 0usize;
3401 while value >= 1024.0 && unit + 1 < UNITS.len() {
3402 value /= 1024.0;
3403 unit += 1;
3404 }
3405 if unit == 0 {
3406 format!("{}{}", bytes, UNITS[unit])
3407 } else if value.fract() == 0.0 {
3408 format!("{value:.0}{}", UNITS[unit])
3409 } else {
3410 let mut formatted = format!("{value:.2}");
3411 while formatted.contains('.') && formatted.ends_with('0') {
3412 formatted.pop();
3413 }
3414 if formatted.ends_with('.') {
3415 formatted.pop();
3416 }
3417 format!("{formatted}{}", UNITS[unit])
3418 }
3419}
3420
3421fn human_duration_seconds(seconds: u64) -> String {
3422 let years = seconds / (365 * 86_400);
3423 let seconds = seconds % (365 * 86_400);
3424 let months = seconds / (30 * 86_400);
3425 let seconds = seconds % (30 * 86_400);
3426 let days = seconds / 86_400;
3427 let seconds = seconds % 86_400;
3428 let hours = seconds / 3600;
3429 let minutes = (seconds % 3600) / 60;
3430 let seconds = seconds % 60;
3431 let mut parts = Vec::new();
3432 if years != 0 {
3433 parts.push(format!("{years}y"));
3434 }
3435 if months != 0 {
3436 parts.push(format!("{months}mo"));
3437 }
3438 if days != 0 {
3439 parts.push(format!("{days}d"));
3440 }
3441 if hours != 0 {
3442 parts.push(format!("{hours}h"));
3443 }
3444 if minutes != 0 {
3445 parts.push(format!("{minutes}m"));
3446 }
3447 if seconds != 0 || parts.is_empty() {
3448 parts.push(format!("{seconds}s"));
3449 }
3450 parts.join(" ")
3451}
3452
3453#[derive(Debug, Default)]
3454struct JournalFileCollection {
3455 files: Vec<PathBuf>,
3456 skipped: u64,
3457 errors: Vec<String>,
3458}
3459
3460fn collect_journal_files(path: &Path) -> Result<JournalFileCollection> {
3461 if !path.is_dir() {
3462 return Err(SdkError::InvalidPath(format!(
3463 "not a directory: {}",
3464 path.display()
3465 )));
3466 }
3467 let mut collection = JournalFileCollection::default();
3468 let mut pending = VecDeque::from([(path.to_path_buf(), 0usize)]);
3469 let mut visited = HashSet::new();
3470 while let Some((directory, depth)) = pending.pop_front() {
3471 let visited_key = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
3472 if visited.contains(&visited_key) {
3473 continue;
3474 }
3475 if visited.len() >= NETDATA_MAX_DIRECTORY_SCAN_COUNT {
3476 collection.skipped = collection.skipped.saturating_add(1);
3477 collection.errors.push(format!(
3478 "{}: directory scan limit reached",
3479 directory.display()
3480 ));
3481 continue;
3482 }
3483 visited.insert(visited_key);
3484 let entries = match std::fs::read_dir(&directory) {
3485 Ok(entries) => entries,
3486 Err(err) if directory == path => return Err(err.into()),
3487 Err(err) => {
3488 collection.skipped = collection.skipped.saturating_add(1);
3489 collection
3490 .errors
3491 .push(format!("{}: {err}", directory.display()));
3492 continue;
3493 }
3494 };
3495 for entry in entries.flatten() {
3496 let entry_path = entry.path();
3497 if entry_path.is_file() && is_journal_file_name(&entry_path) {
3498 collection.files.push(entry_path);
3499 } else if depth < NETDATA_MAX_DIRECTORY_SCAN_DEPTH && entry_path.is_dir() {
3500 pending.push_back((entry_path, depth + 1));
3501 }
3502 }
3503 }
3504 collection.files.sort();
3505 dedup_journal_files_by_canonical_path(&mut collection.files);
3506 Ok(collection)
3507}
3508
3509fn dedup_journal_files_by_canonical_path(files: &mut Vec<PathBuf>) {
3510 let mut seen = HashSet::new();
3511 files.retain(|path| {
3512 let key = std::fs::canonicalize(path).unwrap_or_else(|_| path.clone());
3513 seen.insert(key)
3514 });
3515}
3516
3517#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3518struct JournalFileOrderInfo {
3519 msg_first_realtime_usec: u64,
3520 msg_last_realtime_usec: u64,
3521 file_last_modified_usec: u64,
3522 journal_vs_realtime_delta_usec: u64,
3523}
3524
3525fn journal_file_order_info(
3526 path: &Path,
3527 reader_options: ReaderOptions,
3528 metadata: Option<&NetdataJournalFileMetadata>,
3529) -> JournalFileOrderInfo {
3530 let file_last_modified_usec = std::fs::metadata(path)
3531 .ok()
3532 .and_then(|metadata| metadata.modified().ok())
3533 .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
3534 .map(|duration| duration.as_micros().min(u128::from(u64::MAX)) as u64)
3535 .unwrap_or_default();
3536 let file_last_modified_usec = metadata
3537 .and_then(|metadata| metadata.file_last_modified_usec)
3538 .unwrap_or(file_last_modified_usec);
3539 let journal_vs_realtime_delta_usec = metadata
3540 .and_then(|metadata| metadata.journal_vs_realtime_delta_usec)
3541 .map(normalize_journal_vs_realtime_delta_usec)
3542 .unwrap_or(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
3543
3544 let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
3545 return JournalFileOrderInfo {
3546 msg_first_realtime_usec: 0,
3547 msg_last_realtime_usec: file_last_modified_usec,
3548 file_last_modified_usec,
3549 journal_vs_realtime_delta_usec,
3550 };
3551 };
3552 let header = reader.header();
3553 let msg_first_realtime_usec = metadata
3554 .and_then(|metadata| metadata.msg_first_realtime_usec)
3555 .unwrap_or(header.head_entry_realtime);
3556 let msg_last_realtime_usec = metadata
3557 .and_then(|metadata| metadata.msg_last_realtime_usec)
3558 .unwrap_or_else(|| {
3559 if header.tail_entry_realtime == 0 {
3560 file_last_modified_usec
3561 } else {
3562 header.tail_entry_realtime
3563 }
3564 });
3565 JournalFileOrderInfo {
3566 msg_first_realtime_usec,
3567 msg_last_realtime_usec,
3568 file_last_modified_usec,
3569 journal_vs_realtime_delta_usec,
3570 }
3571}
3572
3573fn compare_journal_file_order(
3574 left: &JournalFileOrderInfo,
3575 right: &JournalFileOrderInfo,
3576 direction: Direction,
3577) -> Ordering {
3578 let backward = right
3579 .msg_last_realtime_usec
3580 .cmp(&left.msg_last_realtime_usec)
3581 .then_with(|| {
3582 right
3583 .file_last_modified_usec
3584 .cmp(&left.file_last_modified_usec)
3585 })
3586 .then_with(|| {
3587 right
3588 .msg_first_realtime_usec
3589 .cmp(&left.msg_first_realtime_usec)
3590 });
3591 match direction {
3592 Direction::Backward => backward,
3593 Direction::Forward => backward.reverse(),
3594 }
3595}
3596
3597fn is_journal_file_name(path: &Path) -> bool {
3598 path.file_name()
3599 .and_then(|name| name.to_str())
3600 .is_some_and(|name| {
3601 name.ends_with(".journal")
3602 || name.ends_with(".journal~")
3603 || name.ends_with(".journal.zst")
3604 || name.ends_with(".journal~.zst")
3605 })
3606}
3607
3608fn push_unique_many(target: &mut Vec<String>, values: &[String]) {
3609 for value in values {
3610 push_unique(target, value);
3611 }
3612}
3613
3614fn string_fields(fields: &[Vec<u8>]) -> Vec<String> {
3615 fields
3616 .iter()
3617 .filter_map(|field| String::from_utf8(field.clone()).ok())
3618 .collect()
3619}
3620
3621fn push_unique(target: &mut Vec<String>, value: impl AsRef<str>) {
3622 let value = value.as_ref();
3623 if !target.iter().any(|existing| existing == value) {
3624 target.push(value.to_string());
3625 }
3626}
3627
3628fn netdata_reorder_key(value: &str) -> String {
3629 value
3630 .trim_start_matches(|character: char| character.is_ascii_punctuation())
3631 .to_ascii_lowercase()
3632}
3633
3634fn histogram_update_every_seconds(histogram: &ExplorerHistogram) -> u64 {
3635 histogram
3636 .buckets
3637 .first()
3638 .map(|bucket| {
3639 bucket
3640 .end_realtime_usec
3641 .saturating_sub(bucket.start_realtime_usec)
3642 .checked_div(1_000_000)
3643 .unwrap_or(1)
3644 .max(1)
3645 })
3646 .unwrap_or(1)
3647}
3648
3649enum TimestampPrecision {
3650 Seconds,
3651 Micros,
3652}
3653
3654fn format_realtime_usec(timestamp: u64, precision: TimestampPrecision) -> String {
3655 let seconds = (timestamp / 1_000_000) as i64;
3656 let micros = (timestamp % 1_000_000) as u32;
3657 DateTime::<Utc>::from_timestamp(seconds, micros.saturating_mul(1000))
3658 .map(|datetime| match precision {
3659 TimestampPrecision::Seconds => datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3660 TimestampPrecision::Micros => datetime.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string(),
3661 })
3662 .unwrap_or_else(|| timestamp.to_string())
3663}
3664
3665fn priority_name(raw: &str) -> Option<&'static str> {
3666 match parse_priority(raw)? {
3667 0 => Some("panic"),
3668 1 => Some("alert"),
3669 2 => Some("critical"),
3670 3 => Some("error"),
3671 4 => Some("warning"),
3672 5 => Some("notice"),
3673 6 => Some("info"),
3674 7 => Some("debug"),
3675 _ => None,
3676 }
3677}
3678
3679fn priority_name_to_number(value: &str) -> Option<&'static str> {
3680 match value {
3681 "panic" | "emergency" | "emerg" => Some("0"),
3682 "alert" => Some("1"),
3683 "critical" | "crit" => Some("2"),
3684 "error" | "err" => Some("3"),
3685 "warning" | "warn" => Some("4"),
3686 "notice" => Some("5"),
3687 "info" => Some("6"),
3688 "debug" => Some("7"),
3689 _ => None,
3690 }
3691}
3692
3693fn parse_priority(raw: &str) -> Option<u8> {
3694 raw.parse::<u8>().ok()
3695}
3696
3697fn priority_to_row_severity(raw: &[u8]) -> &'static str {
3698 let raw = String::from_utf8_lossy(raw);
3699 match parse_priority(&raw) {
3700 Some(priority) if priority <= 3 => "critical",
3701 Some(4) => "warning",
3702 Some(5) => "notice",
3703 Some(priority) if priority >= 7 => "debug",
3704 _ => "normal",
3705 }
3706}
3707
3708fn syslog_facility_name(raw: &str) -> Option<&'static str> {
3709 match raw.parse::<u8>().ok()? {
3710 0 => Some("kern"),
3711 1 => Some("user"),
3712 2 => Some("mail"),
3713 3 => Some("daemon"),
3714 4 => Some("auth"),
3715 5 => Some("syslog"),
3716 6 => Some("lpr"),
3717 7 => Some("news"),
3718 8 => Some("uucp"),
3719 9 => Some("cron"),
3720 10 => Some("authpriv"),
3721 11 => Some("ftp"),
3722 16 => Some("local0"),
3723 17 => Some("local1"),
3724 18 => Some("local2"),
3725 19 => Some("local3"),
3726 20 => Some("local4"),
3727 21 => Some("local5"),
3728 22 => Some("local6"),
3729 23 => Some("local7"),
3730 _ => None,
3731 }
3732}
3733
3734const ERRNO_NAMES: &[(u32, &str)] = &[
3735 (1, "EPERM"),
3736 (2, "ENOENT"),
3737 (3, "ESRCH"),
3738 (4, "EINTR"),
3739 (5, "EIO"),
3740 (6, "ENXIO"),
3741 (7, "E2BIG"),
3742 (8, "ENOEXEC"),
3743 (9, "EBADF"),
3744 (10, "ECHILD"),
3745 (11, "EAGAIN"),
3746 (12, "ENOMEM"),
3747 (13, "EACCES"),
3748 (14, "EFAULT"),
3749 (15, "ENOTBLK"),
3750 (16, "EBUSY"),
3751 (17, "EEXIST"),
3752 (18, "EXDEV"),
3753 (19, "ENODEV"),
3754 (20, "ENOTDIR"),
3755 (21, "EISDIR"),
3756 (22, "EINVAL"),
3757 (23, "ENFILE"),
3758 (24, "EMFILE"),
3759 (25, "ENOTTY"),
3760 (26, "ETXTBSY"),
3761 (27, "EFBIG"),
3762 (28, "ENOSPC"),
3763 (29, "ESPIPE"),
3764 (30, "EROFS"),
3765 (31, "EMLINK"),
3766 (32, "EPIPE"),
3767 (33, "EDOM"),
3768 (34, "ERANGE"),
3769 (35, "EDEADLK"),
3770 (36, "ENAMETOOLONG"),
3771 (37, "ENOLCK"),
3772 (38, "ENOSYS"),
3773 (39, "ENOTEMPTY"),
3774 (40, "ELOOP"),
3775 (42, "ENOMSG"),
3776 (43, "EIDRM"),
3777 (44, "ECHRNG"),
3778 (45, "EL2NSYNC"),
3779 (46, "EL3HLT"),
3780 (47, "EL3RST"),
3781 (48, "ELNRNG"),
3782 (49, "EUNATCH"),
3783 (50, "ENOCSI"),
3784 (51, "EL2HLT"),
3785 (52, "EBADE"),
3786 (53, "EBADR"),
3787 (54, "EXFULL"),
3788 (55, "ENOANO"),
3789 (56, "EBADRQC"),
3790 (57, "EBADSLT"),
3791 (59, "EBFONT"),
3792 (60, "ENOSTR"),
3793 (61, "ENODATA"),
3794 (62, "ETIME"),
3795 (63, "ENOSR"),
3796 (64, "ENONET"),
3797 (65, "ENOPKG"),
3798 (66, "EREMOTE"),
3799 (67, "ENOLINK"),
3800 (68, "EADV"),
3801 (69, "ESRMNT"),
3802 (70, "ECOMM"),
3803 (71, "EPROTO"),
3804 (72, "EMULTIHOP"),
3805 (73, "EDOTDOT"),
3806 (74, "EBADMSG"),
3807 (75, "EOVERFLOW"),
3808 (76, "ENOTUNIQ"),
3809 (77, "EBADFD"),
3810 (78, "EREMCHG"),
3811 (79, "ELIBACC"),
3812 (80, "ELIBBAD"),
3813 (81, "ELIBSCN"),
3814 (82, "ELIBMAX"),
3815 (83, "ELIBEXEC"),
3816 (84, "EILSEQ"),
3817 (85, "ERESTART"),
3818 (86, "ESTRPIPE"),
3819 (87, "EUSERS"),
3820 (88, "ENOTSOCK"),
3821 (89, "EDESTADDRREQ"),
3822 (90, "EMSGSIZE"),
3823 (91, "EPROTOTYPE"),
3824 (92, "ENOPROTOOPT"),
3825 (93, "EPROTONOSUPPORT"),
3826 (94, "ESOCKTNOSUPPORT"),
3827 (95, "ENOTSUP"),
3828 (96, "EPFNOSUPPORT"),
3829 (97, "EAFNOSUPPORT"),
3830 (98, "EADDRINUSE"),
3831 (99, "EADDRNOTAVAIL"),
3832 (100, "ENETDOWN"),
3833 (101, "ENETUNREACH"),
3834 (102, "ENETRESET"),
3835 (103, "ECONNABORTED"),
3836 (104, "ECONNRESET"),
3837 (105, "ENOBUFS"),
3838 (106, "EISCONN"),
3839 (107, "ENOTCONN"),
3840 (108, "ESHUTDOWN"),
3841 (109, "ETOOMANYREFS"),
3842 (110, "ETIMEDOUT"),
3843 (111, "ECONNREFUSED"),
3844 (112, "EHOSTDOWN"),
3845 (113, "EHOSTUNREACH"),
3846 (114, "EALREADY"),
3847 (115, "EINPROGRESS"),
3848 (116, "ESTALE"),
3849 (117, "EUCLEAN"),
3850 (118, "ENOTNAM"),
3851 (119, "ENAVAIL"),
3852 (120, "EISNAM"),
3853 (121, "EREMOTEIO"),
3854 (122, "EDQUOT"),
3855 (123, "ENOMEDIUM"),
3856 (124, "EMEDIUMTYPE"),
3857 (125, "ECANCELED"),
3858 (126, "ENOKEY"),
3859 (127, "EKEYEXPIRED"),
3860 (128, "EKEYREVOKED"),
3861 (129, "EKEYREJECTED"),
3862 (130, "EOWNERDEAD"),
3863 (131, "ENOTRECOVERABLE"),
3864 (132, "ERFKILL"),
3865 (133, "EHWPOISON"),
3866];
3867
3868fn errno_name(raw: &str) -> Option<String> {
3869 let errno = raw.parse::<u32>().ok()?;
3870 let name = ERRNO_NAMES
3871 .iter()
3872 .find_map(|(candidate, name)| (*candidate == errno).then_some(*name))?;
3873 Some(format!("{errno} ({name})"))
3874}
3875
3876fn cap_effective_display(raw: &str) -> String {
3877 if !raw.bytes().next().is_some_and(|byte| byte.is_ascii_digit()) {
3878 return raw.to_string();
3879 }
3880 let Ok(value) = u64::from_str_radix(raw, 16) else {
3881 return raw.to_string();
3882 };
3883 if value == 0 {
3884 return raw.to_string();
3885 }
3886 const CAPABILITIES: &[&str] = &[
3887 "CHOWN",
3888 "DAC_OVERRIDE",
3889 "DAC_READ_SEARCH",
3890 "FOWNER",
3891 "FSETID",
3892 "KILL",
3893 "SETGID",
3894 "SETUID",
3895 "SETPCAP",
3896 "LINUX_IMMUTABLE",
3897 "NET_BIND_SERVICE",
3898 "NET_BROADCAST",
3899 "NET_ADMIN",
3900 "NET_RAW",
3901 "IPC_LOCK",
3902 "IPC_OWNER",
3903 "SYS_MODULE",
3904 "SYS_RAWIO",
3905 "SYS_CHROOT",
3906 "SYS_PTRACE",
3907 "SYS_PACCT",
3908 "SYS_ADMIN",
3909 "SYS_BOOT",
3910 "SYS_NICE",
3911 "SYS_RESOURCE",
3912 "SYS_TIME",
3913 "SYS_TTY_CONFIG",
3914 "MKNOD",
3915 "LEASE",
3916 "AUDIT_WRITE",
3917 "AUDIT_CONTROL",
3918 "SETFCAP",
3919 "MAC_OVERRIDE",
3920 "MAC_ADMIN",
3921 "SYSLOG",
3922 "WAKE_ALARM",
3923 "BLOCK_SUSPEND",
3924 "AUDIT_READ",
3925 "PERFMON",
3926 "BPF",
3927 "CHECKPOINT_RESTORE",
3928 ];
3929 let names: Vec<&str> = CAPABILITIES
3930 .iter()
3931 .enumerate()
3932 .filter_map(|(index, name)| ((value & (1u64 << index)) != 0).then_some(*name))
3933 .collect();
3934 if names.is_empty() {
3935 raw.to_string()
3936 } else {
3937 format!("{raw} ({})", names.join(" | "))
3938 }
3939}
3940
3941fn systemd_field_display_value(
3942 context: &DisplayContext,
3943 scope: DisplayScope,
3944 field: &str,
3945 value: &[u8],
3946 resolve_user_group_names: bool,
3947) -> Value {
3948 let raw = String::from_utf8_lossy(value);
3949 match field {
3950 "PRIORITY" => Value::String(priority_name(&raw).unwrap_or(&raw).to_string()),
3951 "SYSLOG_FACILITY" => Value::String(syslog_facility_name(&raw).unwrap_or(&raw).to_string()),
3952 "ERRNO" => Value::String(errno_name(&raw).unwrap_or_else(|| raw.to_string())),
3953 "MESSAGE_ID" => Value::String(match (message_id_name(&raw), scope) {
3954 (Some(name), DisplayScope::Data) => format!("{raw} ({name})"),
3955 (Some(name), DisplayScope::Facet | DisplayScope::Histogram) => name.to_string(),
3956 (None, _) => raw.into_owned(),
3957 }),
3958 "_BOOT_ID" => Value::String(match (context.boot_first_realtime.get(value), scope) {
3959 (Some(timestamp), DisplayScope::Data) => format!(
3960 "{} ({}) ",
3961 raw,
3962 format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
3963 ),
3964 (Some(timestamp), DisplayScope::Facet | DisplayScope::Histogram) => {
3965 format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
3966 }
3967 (None, _) => raw.into_owned(),
3968 }),
3969 "_UID"
3970 | "_SYSTEMD_OWNER_UID"
3971 | "OBJECT_SYSTEMD_OWNER_UID"
3972 | "OBJECT_UID"
3973 | "_AUDIT_LOGINUID"
3974 | "OBJECT_AUDIT_LOGINUID" => {
3975 if resolve_user_group_names {
3976 Value::String(cached_uid_display(context, raw.as_ref()))
3977 } else {
3978 Value::String(raw.into_owned())
3979 }
3980 }
3981 "_GID" | "OBJECT_GID" => {
3982 if resolve_user_group_names {
3983 Value::String(cached_gid_display(context, raw.as_ref()))
3984 } else {
3985 Value::String(raw.into_owned())
3986 }
3987 }
3988 "_CAP_EFFECTIVE" => Value::String(cap_effective_display(&raw)),
3989 "_SOURCE_REALTIME_TIMESTAMP" => Value::String(match raw.parse::<u64>() {
3990 Ok(timestamp) if timestamp != 0 => {
3991 format!(
3992 "{} ({})",
3993 raw,
3994 format_realtime_usec(timestamp, TimestampPrecision::Micros)
3995 )
3996 }
3997 _ => raw.into_owned(),
3998 }),
3999 _ => Value::String(raw.into_owned()),
4000 }
4001}
4002
4003fn cached_uid_display(context: &DisplayContext, raw: &str) -> String {
4004 if let Some(value) = context.uid_display_cache.borrow().get(raw) {
4005 return value.clone();
4006 }
4007 let value = resolve_uid_name(raw).unwrap_or_else(|| raw.to_string());
4008 context
4009 .uid_display_cache
4010 .borrow_mut()
4011 .insert(raw.to_string(), value.clone());
4012 value
4013}
4014
4015fn cached_gid_display(context: &DisplayContext, raw: &str) -> String {
4016 if let Some(value) = context.gid_display_cache.borrow().get(raw) {
4017 return value.clone();
4018 }
4019 let value = resolve_gid_name(raw).unwrap_or_else(|| raw.to_string());
4020 context
4021 .gid_display_cache
4022 .borrow_mut()
4023 .insert(raw.to_string(), value.clone());
4024 value
4025}
4026
4027#[cfg(unix)]
4028fn resolve_uid_name(raw: &str) -> Option<String> {
4029 let uid = raw.parse::<libc::uid_t>().ok()?;
4030 let mut pwd = std::mem::MaybeUninit::<libc::passwd>::uninit();
4031 let mut result = std::ptr::null_mut();
4032 let mut buffer = vec![0i8; 16_384];
4033 let rc = unsafe {
4034 libc::getpwuid_r(
4035 uid,
4036 pwd.as_mut_ptr(),
4037 buffer.as_mut_ptr(),
4038 buffer.len(),
4039 &mut result,
4040 )
4041 };
4042 if rc != 0 || result.is_null() {
4043 return None;
4044 }
4045 let pwd = unsafe { pwd.assume_init() };
4046 Some(
4047 unsafe { CStr::from_ptr(pwd.pw_name) }
4048 .to_string_lossy()
4049 .into_owned(),
4050 )
4051}
4052
4053#[cfg(not(unix))]
4054fn resolve_uid_name(_raw: &str) -> Option<String> {
4055 None
4056}
4057
4058#[cfg(unix)]
4059fn resolve_gid_name(raw: &str) -> Option<String> {
4060 let gid = raw.parse::<libc::gid_t>().ok()?;
4061 let mut grp = std::mem::MaybeUninit::<libc::group>::uninit();
4062 let mut result = std::ptr::null_mut();
4063 let mut buffer = vec![0i8; 16_384];
4064 let rc = unsafe {
4065 libc::getgrgid_r(
4066 gid,
4067 grp.as_mut_ptr(),
4068 buffer.as_mut_ptr(),
4069 buffer.len(),
4070 &mut result,
4071 )
4072 };
4073 if rc != 0 || result.is_null() {
4074 return None;
4075 }
4076 let grp = unsafe { grp.assume_init() };
4077 Some(
4078 unsafe { CStr::from_ptr(grp.gr_name) }
4079 .to_string_lossy()
4080 .into_owned(),
4081 )
4082}
4083
4084#[cfg(not(unix))]
4085fn resolve_gid_name(_raw: &str) -> Option<String> {
4086 None
4087}
4088
4089const MESSAGE_ID_NAMES: &[(&str, &str)] = &[
4090 ("f77379a8490b408bbe5f6940505a777b", "Journal started"),
4091 ("d93fb3c9c24d451a97cea615ce59c00b", "Journal stopped"),
4092 (
4093 "a596d6fe7bfa4994828e72309e95d61e",
4094 "Journal messages suppressed",
4095 ),
4096 (
4097 "e9bf28e6e834481bb6f48f548ad13606",
4098 "Journal messages missed",
4099 ),
4100 (
4101 "ec387f577b844b8fa948f33cad9a75e6",
4102 "Journal disk space usage",
4103 ),
4104 ("fc2e22bc6ee647b6b90729ab34a250b1", "Coredump"),
4105 ("5aadd8e954dc4b1a8c954d63fd9e1137", "Coredump truncated"),
4106 ("1f4e0a44a88649939aaea34fc6da8c95", "Backtrace"),
4107 ("8d45620c1a4348dbb17410da57c60c66", "User Session created"),
4108 (
4109 "3354939424b4456d9802ca8333ed424a",
4110 "User Session terminated",
4111 ),
4112 ("fcbefc5da23d428093f97c82a9290f7b", "Seat started"),
4113 ("e7852bfe46784ed0accde04bc864c2d5", "Seat removed"),
4114 (
4115 "24d8d4452573402496068381a6312df2",
4116 "VM or container started",
4117 ),
4118 (
4119 "58432bd3bace477cb514b56381b8a758",
4120 "VM or container stopped",
4121 ),
4122 ("c7a787079b354eaaa9e77b371893cd27", "Time change"),
4123 ("45f82f4aef7a4bbf942ce861d1f20990", "Timezone change"),
4124 (
4125 "50876a9db00f4c40bde1a2ad381c3a1b",
4126 "System configuration issues",
4127 ),
4128 (
4129 "b07a249cd024414a82dd00cd181378ff",
4130 "System start-up completed",
4131 ),
4132 (
4133 "eed00a68ffd84e31882105fd973abdd1",
4134 "User start-up completed",
4135 ),
4136 ("6bbd95ee977941e497c48be27c254128", "Sleep start"),
4137 ("8811e6df2a8e40f58a94cea26f8ebf14", "Sleep stop"),
4138 (
4139 "98268866d1d54a499c4e98921d93bc40",
4140 "System shutdown initiated",
4141 ),
4142 (
4143 "c14aaf76ec284a5fa1f105f88dfb061c",
4144 "System factory reset initiated",
4145 ),
4146 ("d9ec5e95e4b646aaaea2fd05214edbda", "Container init crashed"),
4147 (
4148 "3ed0163e868a4417ab8b9e210407a96c",
4149 "System reboot failed after crash",
4150 ),
4151 ("645c735537634ae0a32b15a7c6cba7d4", "Init execution froze"),
4152 (
4153 "5addb3a06a734d3396b794bf98fb2d01",
4154 "Init crashed no coredump",
4155 ),
4156 ("5c9e98de4ab94c6a9d04d0ad793bd903", "Init crashed no fork"),
4157 (
4158 "5e6f1f5e4db64a0eaee3368249d20b94",
4159 "Init crashed unknown signal",
4160 ),
4161 (
4162 "83f84b35ee264f74a3896a9717af34cb",
4163 "Init crashed systemd signal",
4164 ),
4165 (
4166 "3a73a98baf5b4b199929e3226c0be783",
4167 "Init crashed process signal",
4168 ),
4169 (
4170 "2ed18d4f78ca47f0a9bc25271c26adb4",
4171 "Init crashed waitpid failed",
4172 ),
4173 (
4174 "56b1cd96f24246c5b607666fda952356",
4175 "Init crashed coredump failed",
4176 ),
4177 ("4ac7566d4d7548f4981f629a28f0f829", "Init crashed coredump"),
4178 (
4179 "38e8b1e039ad469291b18b44c553a5b7",
4180 "Crash shell failed to fork",
4181 ),
4182 (
4183 "872729b47dbe473eb768ccecd477beda",
4184 "Crash shell failed to execute",
4185 ),
4186 ("658a67adc1c940b3b3316e7e8628834a", "Selinux failed"),
4187 ("e6f456bd92004d9580160b2207555186", "Battery low warning"),
4188 (
4189 "267437d33fdd41099ad76221cc24a335",
4190 "Battery low powering off",
4191 ),
4192 (
4193 "79e05b67bc4545d1922fe47107ee60c5",
4194 "Manager mainloop failed",
4195 ),
4196 ("dbb136b10ef4457ba47a795d62f108c9", "Manager no xdgdir path"),
4197 (
4198 "ed158c2df8884fa584eead2d902c1032",
4199 "Init failed to drop capability bounding set of usermode",
4200 ),
4201 (
4202 "42695b500df048298bee37159caa9f2e",
4203 "Init failed to drop capability bounding set",
4204 ),
4205 (
4206 "bfc2430724ab44499735b4f94cca9295",
4207 "User manager can't disable new privileges",
4208 ),
4209 (
4210 "59288af523be43a28d494e41e26e4510",
4211 "Manager failed to start default target",
4212 ),
4213 (
4214 "689b4fcc97b4486ea5da92db69c9e314",
4215 "Manager failed to isolate default target",
4216 ),
4217 (
4218 "5ed836f1766f4a8a9fc5da45aae23b29",
4219 "Manager failed to collect passed file descriptors",
4220 ),
4221 (
4222 "6a40fbfbd2ba4b8db02fb40c9cd090d7",
4223 "Init failed to fix up environment variables",
4224 ),
4225 (
4226 "0e54470984ac419689743d957a119e2e",
4227 "Manager failed to allocate",
4228 ),
4229 (
4230 "d67fa9f847aa4b048a2ae33535331adb",
4231 "Manager failed to write Smack",
4232 ),
4233 (
4234 "af55a6f75b544431b72649f36ff6d62c",
4235 "System shutdown critical error",
4236 ),
4237 (
4238 "d18e0339efb24a068d9c1060221048c2",
4239 "Init failed to fork off valgrind",
4240 ),
4241 ("7d4958e842da4a758f6c1cdc7b36dcc5", "Unit starting"),
4242 ("39f53479d3a045ac8e11786248231fbf", "Unit started"),
4243 ("be02cf6855d2428ba40df7e9d022f03d", "Unit failed"),
4244 ("de5b426a63be47a7b6ac3eaac82e2f6f", "Unit stopping"),
4245 ("9d1aaa27d60140bd96365438aad20286", "Unit stopped"),
4246 ("d34d037fff1847e6ae669a370e694725", "Unit reloading"),
4247 ("7b05ebc668384222baa8881179cfda54", "Unit reloaded"),
4248 ("5eb03494b6584870a536b337290809b3", "Unit restart scheduled"),
4249 ("ae8f7b866b0347b9af31fe1c80b127c0", "Unit resources"),
4250 ("7ad2d189f7e94e70a38c781354912448", "Unit success"),
4251 ("0e4284a0caca4bfc81c0bb6786972673", "Unit skipped"),
4252 ("d9b373ed55a64feb8242e02dbe79a49c", "Unit failure result"),
4253 (
4254 "641257651c1b4ec9a8624d7a40a9e1e7",
4255 "Process execution failed",
4256 ),
4257 ("98e322203f7a4ed290d09fe03c09fe15", "Unit process exited"),
4258 ("0027229ca0644181a76c4e92458afa2e", "Syslog forward missed"),
4259 (
4260 "1dee0369c7fc4736b7099b38ecb46ee7",
4261 "Mount point is not empty",
4262 ),
4263 ("d989611b15e44c9dbf31e3c81256e4ed", "Unit oomd kill"),
4264 ("fe6faa94e7774663a0da52717891d8ef", "Unit out of memory"),
4265 ("b72ea4a2881545a0b50e200e55b9b06f", "Lid opened"),
4266 ("b72ea4a2881545a0b50e200e55b9b070", "Lid closed"),
4267 ("f5f416b862074b28927a48c3ba7d51ff", "System docked"),
4268 ("51e171bd585248568110144c517cca53", "System undocked"),
4269 ("b72ea4a2881545a0b50e200e55b9b071", "Power key"),
4270 ("3e0117101eb243c1b9a50db3494ab10b", "Power key long press"),
4271 ("9fa9d2c012134ec385451ffe316f97d0", "Reboot key"),
4272 ("f1c59a58c9d943668965c337caec5975", "Reboot key long press"),
4273 ("b72ea4a2881545a0b50e200e55b9b072", "Suspend key"),
4274 ("bfdaf6d312ab4007bc1fe40a15df78e8", "Suspend key long press"),
4275 ("b72ea4a2881545a0b50e200e55b9b073", "Hibernate key"),
4276 (
4277 "167836df6f7f428e98147227b2dc8945",
4278 "Hibernate key long press",
4279 ),
4280 ("c772d24e9a884cbeb9ea12625c306c01", "Invalid configuration"),
4281 (
4282 "1675d7f172174098b1108bf8c7dc8f5d",
4283 "DNSSEC validation failed",
4284 ),
4285 (
4286 "4d4408cfd0d144859184d1e65d7c8a65",
4287 "DNSSEC trust anchor revoked",
4288 ),
4289 ("36db2dfa5a9045e1bd4af5f93e1cf057", "DNSSEC turned off"),
4290 ("b61fdac612e94b9182285b998843061f", "Username unsafe"),
4291 (
4292 "1b3bb94037f04bbf81028e135a12d293",
4293 "Mount point path not suitable",
4294 ),
4295 (
4296 "010190138f494e29a0ef6669749531aa",
4297 "Device path not suitable",
4298 ),
4299 ("b480325f9c394a7b802c231e51a2752c", "Nobody user unsuitable"),
4300 (
4301 "1c0454c1bd2241e0ac6fefb4bc631433",
4302 "Systemd udev settle deprecated",
4303 ),
4304 ("7c8a41f37b764941a0e1780b1be2f037", "Time initial sync"),
4305 ("7db73c8af0d94eeb822ae04323fe6ab6", "Time initial bump"),
4306 ("9e7066279dc8403da79ce4b1a69064b2", "Shutdown scheduled"),
4307 ("249f6fb9e6e2428c96f3f0875681ffa3", "Shutdown canceled"),
4308 ("3f7d5ef3e54f4302b4f0b143bb270cab", "TPM PCR Extended"),
4309 ("f9b0be465ad540d0850ad32172d57c21", "Memory Trimmed"),
4310 ("a8fa8dacdb1d443e9503b8be367a6adb", "SysV Service Found"),
4311 (
4312 "187c62eb1e7f463bb530394f52cb090f",
4313 "Portable Service attached",
4314 ),
4315 (
4316 "76c5c754d628490d8ecba4c9d042112b",
4317 "Portable Service detached",
4318 ),
4319 (
4320 "9cf56b8baf9546cf9478783a8de42113",
4321 "systemd-networkd sysctl changed by foreign process",
4322 ),
4323 (
4324 "ad7089f928ac4f7ea00c07457d47ba8a",
4325 "SRK into TPM authorization failure",
4326 ),
4327 (
4328 "b2bcbaf5edf948e093ce50bbea0e81ec",
4329 "Secure Attention Key (SAK) was pressed",
4330 ),
4331 ("7fc63312330b479bb32e598d47cef1a8", "dbus activate no unit"),
4332 (
4333 "ee9799dab1e24d81b7bee7759a543e1b",
4334 "dbus activate masked unit",
4335 ),
4336 ("a0fa58cafd6f4f0c8d003d16ccf9e797", "dbus broker exited"),
4337 ("c8c6cde1c488439aba371a664353d9d8", "dbus dirwatch"),
4338 ("8af3357071af4153af414daae07d38e7", "dbus dispatch stats"),
4339 ("199d4300277f495f84ba4028c984214c", "dbus no sopeergroup"),
4340 (
4341 "b209c0d9d1764ab38d13b8e00d1784d6",
4342 "dbus protocol violation",
4343 ),
4344 ("6fa70fa776044fa28be7a21daf42a108", "dbus receive failed"),
4345 (
4346 "0ce0fa61d1a9433dabd67417f6b8e535",
4347 "dbus service failed open",
4348 ),
4349 ("24dc708d9e6a4226a3efe2033bb744de", "dbus service invalid"),
4350 ("f15d2347662d483ea9bcd8aa1a691d28", "dbus sighup"),
4351 (
4352 "0ce153587afa4095832d233c17a88001",
4353 "Gnome SM startup succeeded",
4354 ),
4355 (
4356 "10dd2dc188b54a5e98970f56499d1f73",
4357 "Gnome SM unrecoverable failure",
4358 ),
4359 ("f3ea493c22934e26811cd62abe8e203a", "Gnome shell started"),
4360 ("c7b39b1e006b464599465e105b361485", "Flatpak cache"),
4361 ("75ba3deb0af041a9a46272ff85d9e73e", "Flathub pulls"),
4362 ("f02bce89a54e4efab3a94a797d26204a", "Flathub pull errors"),
4363 ("dd11929c788e48bdbb6276fb5f26b08a", "Boltd starting"),
4364 ("1e6061a9fbd44501b3ccc368119f2b69", "Netdata startup"),
4365 (
4366 "ed4cdb8f1beb4ad3b57cb3cae2d162fa",
4367 "Netdata connection from child",
4368 ),
4369 (
4370 "6e2e3839067648968b646045dbf28d66",
4371 "Netdata connection to parent",
4372 ),
4373 (
4374 "9ce0cb58ab8b44df82c4bf1ad9ee22de",
4375 "Netdata alert transition",
4376 ),
4377 (
4378 "6db0018e83e34320ae2a659d78019fb7",
4379 "Netdata alert notification",
4380 ),
4381 ("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"),
4382 (
4383 "8ddaf5ba33a74078b609250db1e951f3",
4384 "Sensor state transition",
4385 ),
4386 (
4387 "ec87a56120d5431bace51e2fb8bba243",
4388 "Netdata log flood protection",
4389 ),
4390 (
4391 "acb33cb95778476baac702eb7e4e151d",
4392 "Netdata Cloud connection",
4393 ),
4394 (
4395 "d1f59606dd4d41e3b217a0cfcae8e632",
4396 "Netdata extreme cardinality",
4397 ),
4398 ("02f47d350af5449197bf7a95b605a468", "Netdata exit reason"),
4399 (
4400 "4fdf40816c124623a032b7fe73beacb8",
4401 "Netdata dynamic configuration",
4402 ),
4403];
4404
4405fn message_id_name(raw: &str) -> Option<&'static str> {
4406 MESSAGE_ID_NAMES
4407 .iter()
4408 .find_map(|(candidate, name)| (*candidate == raw).then_some(*name))
4409}
4410
4411#[cfg(test)]
4412mod tests {
4413 use super::*;
4414 use crate::ExplorerHistogramBucket;
4415 use journal_core::file::{JournalFile, JournalFileOptions, JournalWriter, MmapMut};
4416 use journal_core::repository::File as RepoFile;
4417 use std::cell::Cell;
4418 use std::collections::HashMap;
4419 use tempfile::TempDir;
4420
4421 #[derive(Default)]
4422 struct TestNetdataState {
4423 metadata: HashMap<PathBuf, NetdataJournalFileMetadata>,
4424 updates: Vec<(PathBuf, u64)>,
4425 }
4426
4427 impl NetdataFunctionState for TestNetdataState {
4428 fn file_metadata(&self, path: &Path) -> Option<NetdataJournalFileMetadata> {
4429 self.metadata.get(path).cloned()
4430 }
4431
4432 fn update_file_journal_vs_realtime_delta_usec(&mut self, path: &Path, delta_usec: u64) {
4433 self.updates.push((path.to_path_buf(), delta_usec));
4434 }
4435 }
4436
4437 fn test_uuid(seed: u8) -> uuid::Uuid {
4438 uuid::Uuid::from_bytes([seed; 16])
4439 }
4440
4441 fn write_netdata_test_journal(directory: &std::path::Path, count: usize) {
4442 write_named_netdata_test_journal(
4443 directory,
4444 "netdata-api-test.journal",
4445 count,
4446 1_700_000_000_000_000,
4447 );
4448 }
4449
4450 fn write_named_netdata_test_journal(
4451 directory: &std::path::Path,
4452 name: &str,
4453 count: usize,
4454 start_realtime_usec: u64,
4455 ) {
4456 write_stepped_netdata_test_journal(directory, name, count, start_realtime_usec, 1);
4457 }
4458
4459 fn write_stepped_netdata_test_journal(
4460 directory: &std::path::Path,
4461 name: &str,
4462 count: usize,
4463 start_realtime_usec: u64,
4464 step_realtime_usec: u64,
4465 ) {
4466 std::fs::create_dir_all(directory).expect("create journal dir");
4467 let path = directory.join(name);
4468 let repo_file = RepoFile::from_path(&path).expect("repo file");
4469 let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
4470 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
4471 let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
4472 for index in 0..count {
4473 let message = format!("MESSAGE=row-{index}");
4474 let service = if index % 2 == 0 {
4475 b"SERVICE=even".as_slice()
4476 } else {
4477 b"SERVICE=odd".as_slice()
4478 };
4479 let payloads: [&[u8]; 3] = [message.as_bytes(), service, b"PRIORITY=6"];
4480 let realtime = start_realtime_usec
4481 .saturating_add((index as u64).saturating_mul(step_realtime_usec));
4482 writer
4483 .add_entry(&mut file, &payloads, realtime, realtime)
4484 .expect("write entry");
4485 }
4486 file.sync().expect("sync journal");
4487 }
4488
4489 #[test]
4490 fn parses_netdata_selections_as_and_fields_or_values() {
4491 let request = json!({
4492 "after": 200_000_000,
4493 "before": 200_000_100,
4494 "direction": "forward",
4495 "last": 25,
4496 "facets": ["PRIORITY"],
4497 "selections": {
4498 "PRIORITY": ["warning", "error"],
4499 "_HOSTNAME": ["node-a"],
4500 "__logs_sources": ["all-local-system-logs"],
4501 }
4502 });
4503
4504 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4505 .expect("parse request");
4506 assert_eq!(parsed.after_realtime_usec, Some(200_000_000_000_000));
4507 assert_eq!(parsed.before_realtime_usec, Some(200_000_100_999_999));
4508 assert_eq!(parsed.direction, Direction::Forward);
4509 assert_eq!(parsed.limit, 25);
4510 assert_eq!(parsed.filters.len(), 2);
4511 assert_eq!(parsed.filters[0].field, b"PRIORITY");
4512 assert_eq!(parsed.filters[0].values, vec![b"4".to_vec(), b"3".to_vec()]);
4513 assert_eq!(parsed.filters[1].field, b"_HOSTNAME");
4514 assert_eq!(parsed.filters[1].values, vec![b"node-a".to_vec()]);
4515 }
4516
4517 #[test]
4518 fn netdata_last_one_keeps_echo_and_uses_effective_minimum_two() {
4519 let request = json!({
4520 "after": 200_000_000,
4521 "before": 200_000_100,
4522 "last": 1
4523 });
4524
4525 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4526 .expect("parse request");
4527 let query =
4528 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4529
4530 assert_eq!(parsed.echo.get("last").and_then(Value::as_u64), Some(1));
4531 assert_eq!(parsed.limit, 2);
4532 assert_eq!(query.limit, 2);
4533 }
4534
4535 #[test]
4536 fn netdata_facet_counts_use_native_sliced_filter_semantics() {
4537 let request = json!({
4538 "after": 200_000_000,
4539 "before": 200_000_100,
4540 "facets": ["PRIORITY"],
4541 "selections": {
4542 "PRIORITY": ["warning"]
4543 }
4544 });
4545
4546 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4547 .expect("parse request");
4548 let query =
4549 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4550
4551 assert!(!query.exclude_facet_field_filters);
4552 }
4553
4554 #[test]
4555 fn netdata_multi_filter_facet_counts_exclude_same_field_filter() {
4556 let request = json!({
4557 "after": 200_000_000,
4558 "before": 200_000_100,
4559 "facets": ["PRIORITY", "_BOOT_ID"],
4560 "selections": {
4561 "PRIORITY": ["warning"],
4562 "_BOOT_ID": ["738043aea7b3417cbc3e9941ad26f769"]
4563 }
4564 });
4565
4566 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4567 .expect("parse request");
4568 let query =
4569 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4570
4571 assert!(query.exclude_facet_field_filters);
4572 }
4573
4574 #[test]
4575 fn parses_netdata_fts_query_like_simple_pattern() {
4576 let (terms, positives, negatives) =
4577 parse_fts_query_patterns(r"error|warning|!debug|escaped\|pipe|\!literal| a*B");
4578
4579 assert_eq!(
4580 positives,
4581 vec![
4582 b"error".to_vec(),
4583 b"warning".to_vec(),
4584 b"escaped|pipe".to_vec(),
4585 b"!literal".to_vec(),
4586 b" a*B".to_vec(),
4587 ]
4588 );
4589 assert_eq!(negatives, vec![b"debug".to_vec()]);
4590 assert_eq!(terms.len(), 6);
4591 assert!(!terms[0].negative);
4592 assert!(terms[2].negative);
4593 assert_eq!(
4594 terms[5],
4595 ExplorerFtsPattern {
4596 parts: vec![b" a".to_vec(), b"B".to_vec()],
4597 negative: false,
4598 }
4599 );
4600
4601 let request = json!({
4602 "query": r"alpha|!debug|needle\|pipe",
4603 "facets": ["PRIORITY"],
4604 });
4605 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4606 .expect("parse request");
4607 let query =
4608 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4609 assert_eq!(
4610 query.fts_patterns,
4611 vec![b"alpha".to_vec(), b"needle|pipe".to_vec()]
4612 );
4613 assert_eq!(query.fts_negative_patterns, vec![b"debug".to_vec()]);
4614 assert_eq!(query.fts_terms.len(), 3);
4615 }
4616
4617 #[test]
4618 fn netdata_requests_never_enable_debug_row_traversal_column_collection() {
4619 let request = json!({
4620 "facets": ["PRIORITY", "_HOSTNAME"],
4621 "histogram": "PRIORITY",
4622 "last": 25
4623 });
4624
4625 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
4626 .expect("parse request");
4627 let query =
4628 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
4629
4630 assert!(!query.debug_collect_column_fields_by_row_traversal);
4631 }
4632
4633 #[test]
4634 fn netdata_function_suppresses_absent_requested_facet_groups() {
4635 let dir = TempDir::new().expect("tempdir");
4636 write_netdata_test_journal(dir.path(), 10);
4637 let request = json!({
4638 "after": 1_700_000_000,
4639 "before": 1_700_000_010,
4640 "facets": ["SERVICE", "MISSING_FIELD"],
4641 "histogram": "SERVICE",
4642 "last": 5,
4643 "slice": true
4644 });
4645 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4646
4647 let response = function
4648 .run_directory_request_json_with_options(
4649 dir.path(),
4650 &request,
4651 NetdataFunctionRunOptions::from_timeout_seconds(0),
4652 )
4653 .expect("run function");
4654
4655 let columns = response["columns"].as_object().expect("columns");
4656 assert!(columns.contains_key("SERVICE"));
4657 assert!(!columns.contains_key("MISSING_FIELD"));
4658 let facets = response["facets"].as_array().expect("facets");
4659 assert_eq!(
4660 facets
4661 .iter()
4662 .filter_map(|facet| facet["id"].as_str())
4663 .collect::<Vec<_>>(),
4664 vec!["SERVICE"]
4665 );
4666 let accepted = response["accepted_params"]
4667 .as_array()
4668 .expect("accepted params");
4669 assert!(accepted.iter().any(|value| value == "SERVICE"));
4670 assert!(!accepted.iter().any(|value| value == "MISSING_FIELD"));
4671 let histograms = response["available_histograms"]
4672 .as_array()
4673 .expect("available histograms");
4674 assert!(histograms.iter().any(|value| value["id"] == "SERVICE"));
4675 assert!(
4676 !histograms
4677 .iter()
4678 .any(|value| value["id"] == "MISSING_FIELD")
4679 );
4680 }
4681
4682 #[test]
4683 fn netdata_function_reports_zero_count_existing_facets_for_empty_results() {
4684 let dir = TempDir::new().expect("tempdir");
4685 write_netdata_test_journal(dir.path(), 10);
4686 let request = json!({
4687 "after": 1_700_000_000,
4688 "before": 1_700_000_010,
4689 "facets": ["PRIORITY"],
4690 "histogram": "PRIORITY",
4691 "selections": {
4692 "SERVICE": ["missing"]
4693 },
4694 "last": 5,
4695 "slice": true
4696 });
4697 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4698
4699 let response = function
4700 .run_directory_request_json_with_options(
4701 dir.path(),
4702 &request,
4703 NetdataFunctionRunOptions::from_timeout_seconds(0),
4704 )
4705 .expect("run function");
4706
4707 let facets = response["facets"].as_array().expect("facets");
4708 assert_eq!(facets.len(), 1);
4709 assert_eq!(facets[0]["id"], "PRIORITY");
4710 let options = facets[0]["options"].as_array().expect("options");
4711 assert!(options.iter().any(|option| {
4712 option["id"] == "6" && option["name"] == "info" && option["count"] == 0
4713 }));
4714 assert_eq!(response["items"]["matched"], 0);
4715 }
4716
4717 #[test]
4718 fn netdata_function_api_reports_progress() {
4719 let dir = TempDir::new().expect("tempdir");
4720 write_netdata_test_journal(dir.path(), 9_000);
4721 let request = json!({
4722 "after": 1_700_000_000,
4723 "before": 1_700_000_010,
4724 "facets": ["SERVICE"],
4725 "histogram": "SERVICE",
4726 "last": 0
4727 });
4728 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4729 let mut reports = 0u64;
4730 let mut progress = |progress: NetdataFunctionProgress| {
4731 reports = reports.saturating_add(1);
4732 assert_eq!(progress.current_file, 1);
4733 assert_eq!(progress.total_files, 1);
4734 assert!(progress.stats.rows_examined <= 9_000);
4735 };
4736 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4737 options.progress_interval = Duration::ZERO;
4738 options.progress_callback = Some(&mut progress);
4739
4740 let response = function
4741 .run_directory_request_json_with_options(dir.path(), &request, options)
4742 .expect("run function");
4743
4744 assert_eq!(response["status"], 200);
4745 assert!(reports > 0);
4746 assert_eq!(response["last_modified"], 1_700_000_000_008_999u64);
4747 }
4748
4749 #[test]
4750 fn netdata_function_api_reports_file_end_progress_for_small_scans() {
4751 let dir = TempDir::new().expect("tempdir");
4752 write_netdata_test_journal(dir.path(), 10);
4753 let request = json!({
4754 "after": 1_700_000_000,
4755 "before": 1_700_000_010,
4756 "facets": ["SERVICE"],
4757 "histogram": "SERVICE",
4758 "last": 0
4759 });
4760 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4761 let mut reports = 0u64;
4762 let mut last_rows_examined = 0u64;
4763 let mut progress = |progress: NetdataFunctionProgress| {
4764 reports = reports.saturating_add(1);
4765 last_rows_examined = progress.stats.rows_examined;
4766 };
4767 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4768 options.progress_callback = Some(&mut progress);
4769
4770 let response = function
4771 .run_directory_request_json_with_options(dir.path(), &request, options)
4772 .expect("run function");
4773
4774 assert_eq!(response["status"], 200);
4775 assert_eq!(reports, 1);
4776 assert_eq!(last_rows_examined, 10);
4777 }
4778
4779 #[test]
4780 fn netdata_function_progress_counts_only_query_files() {
4781 let dir = TempDir::new().expect("tempdir");
4782 write_named_netdata_test_journal(
4783 dir.path(),
4784 "old-window.journal",
4785 10,
4786 1_600_000_000_000_000,
4787 );
4788 write_named_netdata_test_journal(
4789 dir.path(),
4790 "current-window.journal",
4791 10,
4792 1_700_000_000_000_000,
4793 );
4794 let request = json!({
4795 "after": 1_700_000_000,
4796 "before": 1_700_000_010,
4797 "facets": ["SERVICE"],
4798 "histogram": "SERVICE",
4799 "last": 0
4800 });
4801 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4802 let mut reports = Vec::new();
4803 let mut progress = |progress: NetdataFunctionProgress| {
4804 reports.push((progress.current_file, progress.total_files));
4805 };
4806 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4807 options.progress_callback = Some(&mut progress);
4808
4809 let response = function
4810 .run_directory_request_json_with_options(dir.path(), &request, options)
4811 .expect("run function");
4812
4813 assert_eq!(response["status"], 200);
4814 assert_eq!(response["_journal_files"]["matched"], 1);
4815 assert_eq!(reports, vec![(1, 1)]);
4816 }
4817
4818 #[test]
4819 fn netdata_function_api_reports_cancellation() {
4820 let dir = TempDir::new().expect("tempdir");
4821 write_netdata_test_journal(dir.path(), 9_000);
4822 let request = json!({
4823 "after": 1_700_000_000,
4824 "before": 1_700_000_010,
4825 "facets": ["SERVICE"],
4826 "histogram": "SERVICE",
4827 "last": 0
4828 });
4829 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4830 let is_cancelled = || true;
4831 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4832 options.cancellation_callback = Some(&is_cancelled);
4833
4834 let response = function
4835 .run_directory_request_json_with_options(dir.path(), &request, options)
4836 .expect("run function");
4837
4838 assert_eq!(response["status"], 499);
4839 assert_eq!(response["errorMessage"], "Request cancelled.");
4840 assert_eq!(
4841 response.as_object().expect("response object").len(),
4842 2,
4843 "plugin-compatible function errors only include status and errorMessage"
4844 );
4845 }
4846
4847 #[test]
4848 fn netdata_function_api_cancels_during_active_scan() {
4849 let dir = TempDir::new().expect("tempdir");
4850 write_netdata_test_journal(dir.path(), 9_000);
4851 let request = json!({
4852 "after": 1_700_000_000,
4853 "before": 1_700_000_010,
4854 "facets": ["SERVICE"],
4855 "histogram": "SERVICE",
4856 "last": 0
4857 });
4858 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4859 let should_cancel = Cell::new(false);
4860 let mut reports = 0u64;
4861 let mut progress = |progress: NetdataFunctionProgress| {
4862 reports = reports.saturating_add(1);
4863 if progress.stats.rows_examined > 0 {
4864 should_cancel.set(true);
4865 }
4866 };
4867 let is_cancelled = || should_cancel.get();
4868 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4869 options.progress_interval = Duration::ZERO;
4870 options.progress_callback = Some(&mut progress);
4871 options.cancellation_callback = Some(&is_cancelled);
4872
4873 let response = function
4874 .run_directory_request_json_with_options(dir.path(), &request, options)
4875 .expect("run function");
4876
4877 assert_eq!(response["status"], 499);
4878 assert_eq!(response["errorMessage"], "Request cancelled.");
4879 assert!(reports > 0);
4880 assert!(should_cancel.get());
4881 }
4882
4883 #[test]
4884 fn netdata_function_api_honors_cancellation_after_final_file_progress() {
4885 let dir = TempDir::new().expect("tempdir");
4886 write_netdata_test_journal(dir.path(), 10);
4887 let request = json!({
4888 "after": 1_700_000_000,
4889 "before": 1_700_000_010,
4890 "facets": ["SERVICE"],
4891 "histogram": "SERVICE",
4892 "last": 0
4893 });
4894 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4895 let should_cancel = Cell::new(false);
4896 let mut progress = |_progress: NetdataFunctionProgress| {
4897 should_cancel.set(true);
4898 };
4899 let is_cancelled = || should_cancel.get();
4900 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
4901 options.progress_callback = Some(&mut progress);
4902 options.cancellation_callback = Some(&is_cancelled);
4903
4904 let response = function
4905 .run_directory_request_json_with_options(dir.path(), &request, options)
4906 .expect("run function");
4907
4908 assert_eq!(response["status"], 499);
4909 assert_eq!(response["errorMessage"], "Request cancelled.");
4910 assert!(should_cancel.get());
4911 }
4912
4913 #[test]
4914 fn netdata_function_api_reports_timeout_as_partial_table() {
4915 let dir = TempDir::new().expect("tempdir");
4916 write_netdata_test_journal(dir.path(), 10);
4917 let request = json!({
4918 "after": 1_700_000_000,
4919 "before": 1_700_000_010,
4920 "facets": ["SERVICE"],
4921 "histogram": "SERVICE",
4922 "last": 0
4923 });
4924 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4925 let options = NetdataFunctionRunOptions {
4926 timeout: Some(Duration::ZERO),
4927 ..NetdataFunctionRunOptions::default()
4928 };
4929
4930 let response = function
4931 .run_directory_request_json_with_options(dir.path(), &request, options)
4932 .expect("run function");
4933
4934 assert_eq!(response["status"], 200);
4935 assert_eq!(response["partial"], true);
4936 assert_eq!(response["message"]["status"], "warning");
4937 assert_eq!(
4938 response["message"]["title"],
4939 "Query timed-out, incomplete data. "
4940 );
4941 }
4942
4943 #[test]
4944 fn netdata_function_api_reports_sampling_counters() {
4945 let dir = TempDir::new().expect("tempdir");
4946 write_stepped_netdata_test_journal(
4947 dir.path(),
4948 "netdata-api-test.journal",
4949 5_000,
4950 1_700_000_000_000_000,
4951 1_000,
4952 );
4953 let request = json!({
4954 "after": 1_700_000_000,
4955 "before": 1_700_000_005,
4956 "facets": ["SERVICE"],
4957 "histogram": "SERVICE",
4958 "last": 5,
4959 "sampling": 20
4960 });
4961 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
4962
4963 let response = function
4964 .run_directory_request_json_with_options(
4965 dir.path(),
4966 &request,
4967 NetdataFunctionRunOptions::from_timeout_seconds(0),
4968 )
4969 .expect("run function");
4970
4971 assert_eq!(response["status"], 200);
4972 assert!(
4973 response["_sampling"]["sampled"]
4974 .as_u64()
4975 .unwrap_or_default()
4976 > 0
4977 );
4978 assert!(
4979 response["_sampling"]["unsampled"]
4980 .as_u64()
4981 .unwrap_or_default()
4982 > 0
4983 );
4984 assert!(
4985 response["_sampling"]["estimated"]
4986 .as_u64()
4987 .unwrap_or_default()
4988 > 0
4989 );
4990 assert_eq!(
4991 response["items"]["estimated"],
4992 response["_sampling"]["estimated"]
4993 );
4994 assert!(
4995 response["items"]["unsampled"].as_u64().unwrap_or_default()
4996 < response["_sampling"]["unsampled"]
4997 .as_u64()
4998 .unwrap_or_default()
4999 );
5000 assert_eq!(response["message"]["status"], "notice");
5001 }
5002
5003 #[test]
5004 fn netdata_function_api_disables_sampling_for_data_only() {
5005 let dir = TempDir::new().expect("tempdir");
5006 write_netdata_test_journal(dir.path(), 5_000);
5007 let request = json!({
5008 "after": 1_700_000_000,
5009 "before": 1_700_000_010,
5010 "data_only": true,
5011 "last": 5,
5012 "sampling": 20
5013 });
5014 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5015
5016 let response = function
5017 .run_directory_request_json_with_options(
5018 dir.path(),
5019 &request,
5020 NetdataFunctionRunOptions::from_timeout_seconds(0),
5021 )
5022 .expect("run function");
5023
5024 assert_eq!(response["status"], 200);
5025 assert!(response.get("_sampling").is_none());
5026 }
5027
5028 #[test]
5029 fn normalizes_missing_time_window_to_last_hour_like_plugin() {
5030 assert_eq!(
5031 normalize_time_window(1_000_000_000, None, None),
5032 (Some(999_996_400_000_000), Some(1_000_000_000_999_999))
5033 );
5034 }
5035
5036 #[test]
5037 fn normalizes_inverted_time_window_like_plugin() {
5038 assert_eq!(
5039 normalize_time_window(1_000_000_000, Some(200_000_100), Some(200_000_000)),
5040 (Some(200_000_000_000_000), Some(200_000_100_999_999))
5041 );
5042 }
5043
5044 #[test]
5045 fn normalizes_equal_time_window_like_plugin() {
5046 assert_eq!(
5047 normalize_time_window(1_000_000_000, Some(200_000_000), Some(200_000_000)),
5048 (Some(199_996_400_000_000), Some(200_000_000_999_999))
5049 );
5050 }
5051
5052 #[test]
5053 fn normalizes_relative_time_window_like_plugin() {
5054 assert_eq!(
5055 normalize_time_window(1_000_000_000, Some(100), Some(200)),
5056 (Some(999_999_701_000_000), Some(999_999_800_999_999))
5057 );
5058 }
5059
5060 #[test]
5061 fn normalizes_missing_after_with_supplied_before_like_plugin() {
5062 assert_eq!(
5063 normalize_time_window(1_000_000_000, None, Some(200_000_000)),
5064 (Some(199_999_401_000_000), Some(200_000_000_999_999))
5065 );
5066 }
5067
5068 #[test]
5069 fn systemd_profile_transforms_priority_and_facility_for_display() {
5070 let profile = SystemdJournalProfile;
5071 let context = DisplayContext::default();
5072 assert_eq!(
5073 profile.field_display_value(&context, DisplayScope::Data, "PRIORITY", b"7"),
5074 json!("debug")
5075 );
5076 assert_eq!(
5077 profile.field_display_value(&context, DisplayScope::Data, "SYSLOG_FACILITY", b"3"),
5078 json!("daemon")
5079 );
5080 assert_eq!(priority_to_row_severity(b"3"), "critical");
5081 assert_eq!(priority_to_row_severity(b"6"), "normal");
5082 }
5083
5084 #[test]
5085 fn dynamic_process_name_matches_plugin_fallback_order() {
5086 let mut fields = BTreeMap::new();
5087 fields.insert("SYSLOG_IDENTIFIER".to_string(), vec![b"syslog".to_vec()]);
5088 fields.insert("_COMM".to_string(), vec![b"comm".to_vec()]);
5089 fields.insert("_PID".to_string(), vec![b"42".to_vec()]);
5090 fields.insert("SYSLOG_PID".to_string(), vec![b"99".to_vec()]);
5091 assert_eq!(dynamic_process_name(&fields), "syslog[42]");
5092
5093 fields.insert("CONTAINER_NAME".to_string(), vec![b"container".to_vec()]);
5094 assert_eq!(dynamic_process_name(&fields), "container[42]");
5095
5096 fields.remove("CONTAINER_NAME");
5097 fields.remove("SYSLOG_IDENTIFIER");
5098 fields.remove("_PID");
5099 assert_eq!(dynamic_process_name(&fields), "comm");
5100
5101 fields.remove("_COMM");
5102 fields.insert("_EXE".to_string(), vec![b"/usr/bin/app".to_vec()]);
5103 assert_eq!(dynamic_process_name(&fields), "-");
5104 }
5105
5106 #[test]
5107 fn facet_values_are_truncated_and_collapsed_like_plugin() {
5108 let prefix = vec![b'a'; NETDATA_FACET_MAX_VALUE_LENGTH];
5109 let mut first = prefix.clone();
5110 first.extend_from_slice(b"-first");
5111 let mut second = prefix.clone();
5112 second.extend_from_slice(b"-second");
5113
5114 let mut values = BTreeMap::new();
5115 add_netdata_facet_count(&mut values, &first, 2);
5116 add_netdata_facet_count(&mut values, &second, 3);
5117
5118 assert_eq!(values.len(), 1);
5119 assert_eq!(values.get(&prefix), Some(&5));
5120 }
5121
5122 #[test]
5123 fn histogram_values_are_truncated_and_collapsed_like_plugin() {
5124 let prefix = vec![b'b'; NETDATA_FACET_MAX_VALUE_LENGTH];
5125 let mut first = prefix.clone();
5126 first.extend_from_slice(b"-first");
5127 let mut second = prefix.clone();
5128 second.extend_from_slice(b"-second");
5129
5130 let mut values = HashMap::new();
5131 values.insert(first, 2);
5132 values.insert(second, 3);
5133 let histogram = ExplorerHistogram {
5134 field: b"TEST_FIELD".to_vec(),
5135 buckets: vec![ExplorerHistogramBucket {
5136 start_realtime_usec: 1_000_000,
5137 end_realtime_usec: 2_000_000,
5138 values,
5139 }],
5140 };
5141
5142 let function = NetdataJournalFunction::systemd_journal();
5143 let rendered = function.build_histogram(&DisplayContext::default(), &histogram, None);
5144 let labels = rendered["chart"]["result"]["labels"]
5145 .as_array()
5146 .expect("labels");
5147 assert_eq!(labels.len(), 2);
5148 assert_eq!(labels[1], Value::String(String::from_utf8(prefix).unwrap()));
5149 assert_eq!(rendered["chart"]["result"]["data"][0][1][0], json!(5));
5150 }
5151
5152 #[test]
5153 fn duplicate_row_timestamps_match_plugin_direction_adjustment() {
5154 let mut backward = vec![
5155 test_located_row(100),
5156 test_located_row(100),
5157 test_located_row(100),
5158 test_located_row(90),
5159 ];
5160 make_row_timestamps_unique(&mut backward, Direction::Backward);
5161 assert_eq!(
5162 backward
5163 .iter()
5164 .map(|row| row.row.realtime_usec)
5165 .collect::<Vec<_>>(),
5166 vec![100, 99, 98, 90]
5167 );
5168
5169 let mut forward = vec![
5170 test_located_row(90),
5171 test_located_row(100),
5172 test_located_row(100),
5173 test_located_row(100),
5174 ];
5175 make_row_timestamps_unique(&mut forward, Direction::Forward);
5176 assert_eq!(
5177 forward
5178 .iter()
5179 .map(|row| row.row.realtime_usec)
5180 .collect::<Vec<_>>(),
5181 vec![90, 100, 101, 102]
5182 );
5183 }
5184
5185 #[test]
5186 fn page_window_counts_forward_anchor_like_netdata_facets() {
5187 let config = NetdataFunctionConfig::systemd_journal();
5188 let request = NetdataRequest::parse(
5189 &json!({
5190 "after": 1_700_000_000,
5191 "before": 1_700_000_010,
5192 "anchor": 1_700_000_005_000_000u64,
5193 "direction": "forward",
5194 "last": 2
5195 }),
5196 &config,
5197 )
5198 .expect("parse request");
5199 let mut window = NetdataPageWindow::for_request(&request);
5200
5201 for realtime_usec in [
5202 1_700_000_003_000_000,
5203 1_700_000_004_000_000,
5204 1_700_000_006_000_000,
5205 1_700_000_007_000_000,
5206 1_700_000_008_000_000,
5207 ] {
5208 window.observe(realtime_usec);
5209 }
5210
5211 let counters = window.counters();
5212 assert_eq!(counters.matched, 3);
5213 assert_eq!(counters.before, 1);
5214 assert_eq!(counters.after, 2);
5215 }
5216
5217 #[test]
5218 fn page_window_counts_backward_anchor_like_netdata_facets() {
5219 let config = NetdataFunctionConfig::systemd_journal();
5220 let request = NetdataRequest::parse(
5221 &json!({
5222 "after": 1_700_000_000,
5223 "before": 1_700_000_010,
5224 "anchor": 1_700_000_008_000_000u64,
5225 "direction": "backward",
5226 "last": 2
5227 }),
5228 &config,
5229 )
5230 .expect("parse request");
5231 let mut window = NetdataPageWindow::for_request(&request);
5232
5233 for realtime_usec in [
5234 1_700_000_009_000_000,
5235 1_700_000_007_000_000,
5236 1_700_000_006_000_000,
5237 1_700_000_005_000_000,
5238 ] {
5239 window.observe(realtime_usec);
5240 }
5241
5242 let counters = window.counters();
5243 assert_eq!(counters.matched, 3);
5244 assert_eq!(counters.before, 1);
5245 assert_eq!(counters.after, 1);
5246 }
5247
5248 #[test]
5249 fn realtime_adjuster_preserves_forward_state_across_file_boundaries() {
5250 let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Forward);
5251
5252 assert_eq!(adjuster.adjust(10), 10);
5253 assert_eq!(adjuster.adjust(10), 11);
5254 assert_eq!(adjuster.adjust(10), 12);
5255 }
5256
5257 #[test]
5258 fn realtime_adjuster_preserves_backward_state_across_file_boundaries() {
5259 let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Backward);
5260
5261 assert_eq!(adjuster.adjust(10), 10);
5262 assert_eq!(adjuster.adjust(10), 9);
5263 assert_eq!(adjuster.adjust(10), 8);
5264 }
5265
5266 #[test]
5267 fn systemd_profile_keeps_user_group_ids_raw_by_default() {
5268 let context = DisplayContext::default();
5269 let profile = SystemdJournalProfile;
5270 assert_eq!(
5271 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
5272 json!("0")
5273 );
5274 assert_eq!(
5275 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
5276 json!("0")
5277 );
5278 }
5279
5280 #[cfg(unix)]
5281 #[test]
5282 fn plugin_compatible_profile_resolves_user_group_ids_explicitly() {
5283 let context = DisplayContext::default();
5284 let profile = SystemdJournalPluginProfile;
5285 assert_eq!(
5286 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
5287 json!("root")
5288 );
5289 assert_eq!(
5290 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
5291 json!("root")
5292 );
5293 }
5294
5295 #[cfg(unix)]
5296 #[test]
5297 fn plugin_compatible_profile_caches_user_group_resolution() {
5298 let context = DisplayContext::default();
5299 let profile = SystemdJournalPluginProfile;
5300
5301 assert_eq!(
5302 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
5303 json!("root")
5304 );
5305 assert_eq!(
5306 profile.field_display_value(&context, DisplayScope::Data, "_UID", b"0"),
5307 json!("root")
5308 );
5309 assert_eq!(
5310 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
5311 json!("root")
5312 );
5313 assert_eq!(
5314 profile.field_display_value(&context, DisplayScope::Data, "_GID", b"0"),
5315 json!("root")
5316 );
5317
5318 assert_eq!(context.uid_display_cache.borrow().len(), 1);
5319 assert_eq!(context.gid_display_cache.borrow().len(), 1);
5320 }
5321
5322 #[test]
5323 fn file_overlap_uses_netdata_max_realtime_slack() {
5324 let file_first_seconds = 200_000_000u64;
5325 let file_last_seconds = 200_000_100u64;
5326 let slack_seconds = NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC / 1_000_000;
5327 let header = crate::FileHeader {
5328 signature: *b"LPKSHHRH",
5329 compatible_flags: 0,
5330 incompatible_flags: 0,
5331 state: 0,
5332 header_size: 0,
5333 n_entries: 0,
5334 head_entry_realtime: file_first_seconds * 1_000_000,
5335 tail_entry_realtime: file_last_seconds * 1_000_000,
5336 head_entry_seqnum: 0,
5337 tail_entry_seqnum: 0,
5338 tail_entry_boot_id: [0; 16],
5339 seqnum_id: [0; 16],
5340 };
5341 let config = NetdataFunctionConfig::systemd_journal();
5342
5343 let inside_slack = NetdataRequest::parse(
5344 &json!({
5345 "after": file_last_seconds + slack_seconds - 1,
5346 "before": file_last_seconds + slack_seconds + 500
5347 }),
5348 &config,
5349 )
5350 .expect("parse request");
5351 assert!(file_may_overlap_request(header, &inside_slack));
5352
5353 let outside_slack = NetdataRequest::parse(
5354 &json!({
5355 "after": file_last_seconds + slack_seconds + 1,
5356 "before": file_last_seconds + slack_seconds + 500
5357 }),
5358 &config,
5359 )
5360 .expect("parse request");
5361 assert!(!file_may_overlap_request(header, &outside_slack));
5362 }
5363
5364 #[test]
5365 fn journal_file_order_matches_plugin_comparator_shape() {
5366 let older = JournalFileOrderInfo {
5367 msg_first_realtime_usec: 100,
5368 msg_last_realtime_usec: 200,
5369 file_last_modified_usec: 200,
5370 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
5371 };
5372 let newer = JournalFileOrderInfo {
5373 msg_first_realtime_usec: 100,
5374 msg_last_realtime_usec: 300,
5375 file_last_modified_usec: 100,
5376 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
5377 };
5378 assert_eq!(
5379 compare_journal_file_order(&newer, &older, Direction::Backward),
5380 Ordering::Less
5381 );
5382 assert_eq!(
5383 compare_journal_file_order(&newer, &older, Direction::Forward),
5384 Ordering::Greater
5385 );
5386
5387 let newer_mtime = JournalFileOrderInfo {
5388 msg_first_realtime_usec: 100,
5389 msg_last_realtime_usec: 200,
5390 file_last_modified_usec: 300,
5391 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
5392 };
5393 assert_eq!(
5394 compare_journal_file_order(&newer_mtime, &older, Direction::Backward),
5395 Ordering::Less
5396 );
5397
5398 let newer_first = JournalFileOrderInfo {
5399 msg_first_realtime_usec: 150,
5400 msg_last_realtime_usec: 200,
5401 file_last_modified_usec: 200,
5402 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
5403 };
5404 assert_eq!(
5405 compare_journal_file_order(&newer_first, &older, Direction::Backward),
5406 Ordering::Less
5407 );
5408 }
5409
5410 #[test]
5411 fn boot_first_realtime_keeps_earliest_timestamp_like_plugin() {
5412 let mut boot_first = BTreeMap::new();
5413 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 300);
5414 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 100);
5415 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 200);
5416
5417 assert_eq!(boot_first.get(b"boot-a".as_slice()), Some(&100));
5418 }
5419
5420 #[test]
5421 fn merge_histogram_rejects_inconsistent_bucket_shape() {
5422 let mut target = Some(ExplorerHistogram {
5423 field: b"PRIORITY".to_vec(),
5424 buckets: vec![ExplorerHistogramBucket {
5425 start_realtime_usec: 1_000_000,
5426 end_realtime_usec: 2_000_000,
5427 values: HashMap::new(),
5428 }],
5429 });
5430 let source = ExplorerHistogram {
5431 field: b"PRIORITY".to_vec(),
5432 buckets: vec![ExplorerHistogramBucket {
5433 start_realtime_usec: 1_000_000,
5434 end_realtime_usec: 1_500_000,
5435 values: HashMap::new(),
5436 }],
5437 };
5438
5439 let err = merge_histogram(&mut target, source).expect_err("shape mismatch");
5440 assert!(matches!(
5441 err,
5442 SdkError::Unsupported("inconsistent Netdata histogram bucket shape")
5443 ));
5444 }
5445
5446 #[test]
5447 fn collect_journal_files_recurses_nested_directories() {
5448 let dir = TempDir::new().expect("tempdir");
5449 let nested = dir.path().join("machine").join("nested");
5450 write_named_netdata_test_journal(&nested, "system.journal", 1, 1_700_000_000_000_000);
5451
5452 let collection = collect_journal_files(dir.path()).expect("collect files");
5453
5454 assert_eq!(collection.files.len(), 1);
5455 assert_eq!(collection.skipped, 0);
5456 assert!(collection.errors.is_empty());
5457 assert_eq!(
5458 collection.files[0]
5459 .file_name()
5460 .and_then(|name| name.to_str()),
5461 Some("system.journal")
5462 );
5463 }
5464
5465 #[cfg(unix)]
5466 #[test]
5467 fn collect_journal_files_deduplicates_symlinked_directories() {
5468 let dir = TempDir::new().expect("tempdir");
5469 let real = dir.path().join("real");
5470 let link = dir.path().join("link");
5471 write_named_netdata_test_journal(&real, "system.journal", 1, 1_700_000_000_000_000);
5472 std::os::unix::fs::symlink(&real, &link).expect("symlink");
5473
5474 let collection = collect_journal_files(dir.path()).expect("collect files");
5475
5476 assert_eq!(collection.files.len(), 1);
5477 assert_eq!(collection.skipped, 0);
5478 assert!(collection.errors.is_empty());
5479 }
5480
5481 #[cfg(unix)]
5482 #[test]
5483 fn collect_journal_files_deduplicates_symlinked_files() {
5484 let dir = TempDir::new().expect("tempdir");
5485 write_named_netdata_test_journal(dir.path(), "system.journal", 1, 1_700_000_000_000_000);
5486 std::os::unix::fs::symlink(
5487 dir.path().join("system.journal"),
5488 dir.path().join("system-copy.journal"),
5489 )
5490 .expect("symlink");
5491
5492 let collection = collect_journal_files(dir.path()).expect("collect files");
5493
5494 assert_eq!(collection.files.len(), 1);
5495 assert_eq!(collection.skipped, 0);
5496 assert!(collection.errors.is_empty());
5497 }
5498
5499 #[cfg(unix)]
5500 #[test]
5501 fn collect_journal_files_reports_unreadable_subdirectories() {
5502 use std::os::unix::fs::PermissionsExt;
5503
5504 if unsafe { libc::geteuid() } == 0 {
5505 return;
5506 }
5507
5508 let dir = TempDir::new().expect("tempdir");
5509 std::fs::write(dir.path().join("visible.journal"), b"").expect("journal");
5510 let locked = dir.path().join("locked");
5511 std::fs::create_dir(&locked).expect("locked dir");
5512 std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o000))
5513 .expect("lock dir");
5514
5515 let collection = collect_journal_files(dir.path()).expect("collect files");
5516
5517 std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o700))
5518 .expect("unlock dir");
5519 assert_eq!(collection.files.len(), 1);
5520 assert_eq!(collection.skipped, 1);
5521 assert_eq!(collection.errors.len(), 1);
5522 assert!(collection.errors[0].contains("locked"));
5523 }
5524
5525 #[test]
5526 fn source_summary_fills_missing_caller_metadata_from_header() {
5527 let dir = TempDir::new().expect("tempdir");
5528 write_named_netdata_test_journal(dir.path(), "system.journal", 2, 1_700_000_000_000_000);
5529 let path = dir.path().join("system.journal");
5530 let mut state = TestNetdataState::default();
5531 state.metadata.insert(
5532 path.clone(),
5533 NetdataJournalFileMetadata {
5534 msg_first_realtime_usec: Some(1_699_999_999_000_000),
5535 ..NetdataJournalFileMetadata::default()
5536 },
5537 );
5538 let options = NetdataFunctionRunOptions {
5539 state: Some(&mut state),
5540 ..NetdataFunctionRunOptions::default()
5541 };
5542
5543 let summary = JournalSourceSummary::from_paths(&[path], ReaderOptions::default(), &options);
5544
5545 assert_eq!(summary.first_realtime_usec, Some(1_699_999_999_000_000));
5546 assert_eq!(summary.last_realtime_usec, Some(1_700_000_000_000_001));
5547 }
5548
5549 #[test]
5550 fn source_selection_echoes_and_filters_known_groups() {
5551 let config = NetdataFunctionConfig::systemd_journal();
5552 let request = NetdataRequest::parse(
5553 &json!({
5554 "selections": {
5555 "__logs_sources": ["all-local-system-logs"]
5556 }
5557 }),
5558 &config,
5559 )
5560 .expect("parse source-filtered request");
5561
5562 assert_eq!(request.source_type, SOURCE_TYPE_LOCAL_SYSTEM);
5563 assert_eq!(
5564 request.echo.get("source_type").and_then(Value::as_u64),
5565 Some(SOURCE_TYPE_LOCAL_SYSTEM)
5566 );
5567 assert!(
5568 request
5569 .echo
5570 .pointer("/selections/__logs_sources/0")
5571 .is_some_and(Value::is_null)
5572 );
5573 assert!(request.matches_source(Path::new("/var/log/journal/machine/system.journal"), None));
5574 assert!(!request.matches_source(
5575 Path::new("/var/log/journal/machine/user-1000.journal"),
5576 None
5577 ));
5578 }
5579
5580 #[test]
5581 fn source_selection_uses_caller_metadata_before_filename_fallback() {
5582 let dir = TempDir::new().expect("tempdir");
5583 write_named_netdata_test_journal(dir.path(), "user-1000.journal", 1, 1_700_000_000_000_000);
5584 let path = dir.path().join("user-1000.journal");
5585 let config = NetdataFunctionConfig::systemd_journal();
5586 let request = NetdataRequest::parse(
5587 &json!({
5588 "after": 1_700_000_000,
5589 "before": 1_700_000_001,
5590 "selections": {
5591 "__logs_sources": ["all-local-system-logs"]
5592 }
5593 }),
5594 &config,
5595 )
5596 .expect("parse source-filtered request");
5597 assert!(!request.matches_source(&path, None));
5598
5599 let mut state = TestNetdataState::default();
5600 state.metadata.insert(
5601 path.clone(),
5602 NetdataJournalFileMetadata {
5603 source_type: Some(NETDATA_SOURCE_TYPE_LOCAL_SYSTEM),
5604 source_name: Some("system-registry".to_string()),
5605 ..NetdataJournalFileMetadata::default()
5606 },
5607 );
5608 let options = NetdataFunctionRunOptions {
5609 state: Some(&mut state),
5610 ..NetdataFunctionRunOptions::default()
5611 };
5612 let selected =
5613 select_journal_files_for_request(vec![path], &request, config.reader_options, &options);
5614
5615 assert_eq!(selected.files.len(), 1);
5616 }
5617
5618 #[test]
5619 fn netdata_function_state_receives_learned_source_realtime_delta() {
5620 let dir = TempDir::new().expect("tempdir");
5621 let commit_realtime_usec = 1_700_000_030_000_000;
5622 let source_realtime_usec = commit_realtime_usec - 30_000_000;
5623 write_source_realtime_delta_journal(
5624 dir.path(),
5625 "system.journal",
5626 commit_realtime_usec,
5627 source_realtime_usec,
5628 );
5629 let request = json!({
5630 "after": 1_700_000_029,
5631 "before": 1_700_000_031,
5632 "facets": ["SERVICE"],
5633 "histogram": "SERVICE",
5634 "last": 1,
5635 "sampling": 0
5636 });
5637 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5638 let mut state = TestNetdataState::default();
5639 let options = NetdataFunctionRunOptions {
5640 state: Some(&mut state),
5641 ..NetdataFunctionRunOptions::from_timeout_seconds(0)
5642 };
5643
5644 let response = function
5645 .run_directory_request_json_with_options(dir.path(), &request, options)
5646 .expect("run function");
5647
5648 assert_eq!(response["status"], 200);
5649 assert_eq!(state.updates.len(), 1);
5650 assert_eq!(state.updates[0].1, 30_000_000);
5651 }
5652
5653 #[test]
5654 fn source_classification_matches_plugin_filename_shape() {
5655 assert_eq!(
5656 journal_file_source_type(Path::new("/var/log/journal/machine/system.journal")),
5657 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM
5658 );
5659 assert_eq!(
5660 journal_file_source_type(Path::new("/var/log/journal/machine/user-1000.journal")),
5661 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER
5662 );
5663 assert_eq!(
5664 journal_file_source_type(Path::new("/var/log/journal/machine/other.journal")),
5665 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
5666 );
5667 assert_eq!(
5668 journal_file_source_type(Path::new(
5669 "/var/log/journal/machine.namespace/system.journal"
5670 )),
5671 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE
5672 );
5673 assert_eq!(
5674 journal_file_source_type(Path::new(
5675 "/var/log/journal/remote/remote-host-a@machine.journal"
5676 )),
5677 SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL
5678 );
5679 }
5680
5681 #[test]
5682 fn exact_source_names_follow_plugin_prefixes() {
5683 assert_eq!(
5684 journal_file_exact_source_name(Path::new(
5685 "/var/log/journal/machine.namespace/system.journal"
5686 ))
5687 .as_deref(),
5688 Some("namespace-namespace")
5689 );
5690 assert_eq!(
5691 journal_file_exact_source_name(Path::new(
5692 "/var/log/journal/remote/remote-host-a@machine.journal"
5693 ))
5694 .as_deref(),
5695 Some("remote-host-a")
5696 );
5697 assert_eq!(
5698 journal_file_exact_source_name(Path::new(
5699 "/var/log/journal/remote/remote-host-b.journal~.zst"
5700 ))
5701 .as_deref(),
5702 Some("remote-host-b")
5703 );
5704 }
5705
5706 #[test]
5707 fn disposed_journal_extension_matches_plugin_scan_contract() {
5708 assert!(is_journal_file_name(Path::new("active.journal")));
5709 assert!(is_journal_file_name(Path::new("archived.journal~")));
5710 assert!(is_journal_file_name(Path::new("active.journal.zst")));
5711 assert!(is_journal_file_name(Path::new("archived.journal~.zst")));
5712 }
5713
5714 fn test_located_row(realtime_usec: u64) -> LocatedRow {
5715 LocatedRow {
5716 file_path: PathBuf::from("test.journal"),
5717 row: ExplorerRow {
5718 realtime_usec,
5719 cursor: String::new(),
5720 payloads: Vec::new(),
5721 },
5722 }
5723 }
5724
5725 fn write_source_realtime_delta_journal(
5726 directory: &std::path::Path,
5727 name: &str,
5728 commit_realtime_usec: u64,
5729 source_realtime_usec: u64,
5730 ) {
5731 std::fs::create_dir_all(directory).expect("create journal dir");
5732 let path = directory.join(name);
5733 let repo_file = RepoFile::from_path(&path).expect("repo file");
5734 let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
5735 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
5736 let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
5737 let source = format!("_SOURCE_REALTIME_TIMESTAMP={source_realtime_usec}");
5738 let payloads: [&[u8]; 4] = [
5739 b"MESSAGE=source lag test".as_slice(),
5740 b"SERVICE=delta".as_slice(),
5741 b"PRIORITY=6".as_slice(),
5742 source.as_bytes(),
5743 ];
5744 writer
5745 .add_entry(
5746 &mut file,
5747 &payloads,
5748 commit_realtime_usec,
5749 commit_realtime_usec,
5750 )
5751 .expect("write entry");
5752 file.sync().expect("sync journal");
5753 }
5754}