1use crate::explorer::{ExplorerSamplingState, histogram_bucket_count_for_query};
2use crate::{
3 Direction, ExplorerAnchor, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
4 ExplorerFtsPattern, ExplorerHistogram, ExplorerProgress, ExplorerQuery, ExplorerResult,
5 ExplorerRow, ExplorerSampling, ExplorerStats, ExplorerStopReason, ExplorerStrategy, FileHeader,
6 FileReader, ReaderOptions, Result, SdkError,
7};
8use chrono::{DateTime, Utc};
9use serde_json::{Map, Value, json};
10use std::cell::RefCell;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet, VecDeque};
13#[cfg(unix)]
14use std::ffi::CStr;
15use std::path::{Path, PathBuf};
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18const DEFAULT_FUNCTION_NAME: &str = "systemd-journal";
19const DEFAULT_ITEMS_TO_RETURN: usize = 200;
20const DEFAULT_TIME_WINDOW_SECONDS: i64 = 3600;
21const DEFAULT_ITEMS_SAMPLING: u64 = 1_000_000;
22const DATA_ONLY_CHECK_EVERY_ROWS: u64 = 128;
23const API_RELATIVE_TIME_MAX_SECONDS: i64 = 3 * 365 * 86_400;
24const NETDATA_MISSING_AFTER_RELATIVE_SECONDS: i64 = 600;
25const DEFAULT_HISTOGRAM_BUCKETS: usize = 150;
26const EFFECTIVELY_DISABLED_TIMEOUT_SECONDS: u64 = 100 * 365 * 24 * 60 * 60;
27const NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC: u64 = 5_000_000;
28const NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC: u64 = 2 * 60 * 1_000_000;
29const NETDATA_EMPTY_STRING_FACET_HASH_ID: &str = "CzGfAU2z3TC";
30const NETDATA_UNAVAILABLE_FIELD_LABEL: &str = "[unavailable field]";
31const NETDATA_FACET_MAX_VALUE_LENGTH: usize = 8192;
32const NETDATA_MAX_DIRECTORY_SCAN_DEPTH: usize = 64;
33const NETDATA_MAX_DIRECTORY_SCAN_COUNT: usize = 8192;
34const SOURCE_TYPE_ALL: u64 = 1 << 0;
35const SOURCE_TYPE_LOCAL_ALL: u64 = 1 << 1;
36const SOURCE_TYPE_REMOTE_ALL: u64 = 1 << 2;
37const SOURCE_TYPE_LOCAL_SYSTEM: u64 = 1 << 3;
38const SOURCE_TYPE_LOCAL_USER: u64 = 1 << 4;
39const SOURCE_TYPE_LOCAL_NAMESPACE: u64 = 1 << 5;
40const SOURCE_TYPE_LOCAL_OTHER: u64 = 1 << 6;
41
42pub const NETDATA_SOURCE_TYPE_ALL: u64 = SOURCE_TYPE_ALL;
43pub const NETDATA_SOURCE_TYPE_LOCAL_ALL: u64 = SOURCE_TYPE_LOCAL_ALL;
44pub const NETDATA_SOURCE_TYPE_REMOTE_ALL: u64 = SOURCE_TYPE_REMOTE_ALL;
45pub const NETDATA_SOURCE_TYPE_LOCAL_SYSTEM: u64 = SOURCE_TYPE_LOCAL_SYSTEM;
46pub const NETDATA_SOURCE_TYPE_LOCAL_USER: u64 = SOURCE_TYPE_LOCAL_USER;
47pub const NETDATA_SOURCE_TYPE_LOCAL_NAMESPACE: u64 = SOURCE_TYPE_LOCAL_NAMESPACE;
48pub const NETDATA_SOURCE_TYPE_LOCAL_OTHER: u64 = SOURCE_TYPE_LOCAL_OTHER;
49
50const NETDATA_ACCEPTED_PARAMS: &[&str] = &[
51 "info",
52 "__logs_sources",
53 "after",
54 "before",
55 "anchor",
56 "direction",
57 "last",
58 "query",
59 "facets",
60 "histogram",
61 "if_modified_since",
62 "data_only",
63 "delta",
64 "tail",
65 "sampling",
66 "slice",
67];
68
69const SYSTEMD_DEFAULT_VIEW_KEYS: &[&str] = &[
70 "_HOSTNAME",
71 "ND_JOURNAL_PROCESS",
72 "MESSAGE",
73 "PRIORITY",
74 "SYSLOG_FACILITY",
75 "ERRNO",
76 "ND_JOURNAL_FILE",
77 "SYSLOG_IDENTIFIER",
78 "UNIT",
79 "USER_UNIT",
80 "MESSAGE_ID",
81 "_BOOT_ID",
82 "_SYSTEMD_OWNER_UID",
83 "_UID",
84 "OBJECT_SYSTEMD_OWNER_UID",
85 "OBJECT_UID",
86 "_GID",
87 "OBJECT_GID",
88 "_CAP_EFFECTIVE",
89 "_AUDIT_LOGINUID",
90 "OBJECT_AUDIT_LOGINUID",
91 "_SOURCE_REALTIME_TIMESTAMP",
92];
93
94const SYSTEMD_DEFAULT_FACETS: &[&str] = &[
95 "_HOSTNAME",
96 "PRIORITY",
97 "SYSLOG_FACILITY",
98 "ERRNO",
99 "SYSLOG_IDENTIFIER",
100 "UNIT",
101 "USER_UNIT",
102 "MESSAGE_ID",
103 "_BOOT_ID",
104 "_SYSTEMD_OWNER_UID",
105 "_UID",
106 "OBJECT_SYSTEMD_OWNER_UID",
107 "OBJECT_UID",
108 "_GID",
109 "OBJECT_GID",
110 "_AUDIT_LOGINUID",
111 "OBJECT_AUDIT_LOGINUID",
112 "CODE_FILE",
113 "_SYSTEMD_UNIT",
114 "_SYSTEMD_USER_SLICE",
115 "CODE_FUNC",
116 "_TRANSPORT",
117 "_COMM",
118 "_RUNTIME_SCOPE",
119 "_MACHINE_ID",
120 "_SYSTEMD_SLICE",
121 "UNIT_RESULT",
122 "_SYSTEMD_CGROUP",
123 "_EXE",
124 "_SYSTEMD_USER_UNIT",
125 "_SYSTEMD_SESSION",
126 "COREDUMP_CGROUP",
127 "COREDUMP_USER_UNIT",
128 "COREDUMP_UNIT",
129 "COREDUMP_SIGNAL_NAME",
130 "COREDUMP_COMM",
131 "_UDEV_DEVNODE",
132 "_KERNEL_SUBSYSTEM",
133 "OBJECT_EXE",
134 "OBJECT_SYSTEMD_CGROUP",
135 "OBJECT_COMM",
136 "OBJECT_SYSTEMD_UNIT",
137 "OBJECT_SYSTEMD_USER_UNIT",
138 "_SELINUX_CONTEXT",
139 "_NAMESPACE",
140 "OBJECT_SYSTEMD_SESSION",
141 "CONTAINER_ID",
142 "CONTAINER_NAME",
143 "CONTAINER_TAG",
144 "IMAGE_NAME",
145 "ND_NIDL_NODE",
146 "ND_NIDL_CONTEXT",
147 "ND_LOG_SOURCE",
148 "ND_ALERT_NAME",
149 "ND_ALERT_CLASS",
150 "ND_ALERT_COMPONENT",
151 "ND_ALERT_TYPE",
152 "ND_ALERT_STATUS",
153];
154
155#[derive(Debug, Clone)]
156pub struct NetdataFunctionConfig {
157 pub function_name: String,
158 pub default_facets: Vec<String>,
159 pub default_view_keys: Vec<String>,
160 pub default_histogram: Option<String>,
161 pub reader_options: ReaderOptions,
162 pub explorer_strategy: ExplorerStrategy,
163}
164
165impl NetdataFunctionConfig {
166 pub fn systemd_journal() -> Self {
167 Self {
168 function_name: DEFAULT_FUNCTION_NAME.to_string(),
169 default_facets: SYSTEMD_DEFAULT_FACETS
170 .iter()
171 .map(|field| (*field).to_string())
172 .collect(),
173 default_view_keys: SYSTEMD_DEFAULT_VIEW_KEYS
174 .iter()
175 .map(|field| (*field).to_string())
176 .collect(),
177 default_histogram: Some("PRIORITY".to_string()),
178 reader_options: ReaderOptions::snapshot(),
179 explorer_strategy: ExplorerStrategy::Traversal,
180 }
181 }
182}
183
184impl Default for NetdataFunctionConfig {
185 fn default() -> Self {
186 Self::systemd_journal()
187 }
188}
189
190#[derive(Debug, Default)]
191pub struct DisplayContext {
192 boot_first_realtime: BTreeMap<Vec<u8>, u64>,
193 uid_display_cache: RefCell<BTreeMap<String, String>>,
194 gid_display_cache: RefCell<BTreeMap<String, String>>,
195}
196
197#[derive(Debug, Clone, Copy)]
198pub enum DisplayScope {
199 Data,
200 Facet,
201 Histogram,
202}
203
204pub trait NetdataFunctionProfile {
205 fn field_display_value(
206 &self,
207 _context: &DisplayContext,
208 _scope: DisplayScope,
209 _field: &str,
210 value: &[u8],
211 ) -> Value {
212 Value::String(String::from_utf8_lossy(value).into_owned())
213 }
214
215 fn facet_option_name(&self, context: &DisplayContext, field: &str, raw_value: &[u8]) -> String {
216 match self.field_display_value(context, DisplayScope::Facet, field, raw_value) {
217 Value::String(value) => value,
218 other => other.to_string(),
219 }
220 }
221
222 fn row_options(&self, fields: &BTreeMap<String, Vec<Vec<u8>>>) -> Value {
223 if let Some(priority) = first_value(fields, "PRIORITY") {
224 return json!({ "severity": priority_to_row_severity(priority) });
225 }
226 json!({ "severity": "normal" })
227 }
228}
229
230#[derive(Debug, Clone, Copy, Default)]
231pub struct SystemdJournalProfile;
232
233#[derive(Debug, Clone, Copy, Default)]
234pub struct SystemdJournalPluginProfile;
235
236impl NetdataFunctionProfile for SystemdJournalProfile {
237 fn field_display_value(
238 &self,
239 context: &DisplayContext,
240 scope: DisplayScope,
241 field: &str,
242 value: &[u8],
243 ) -> Value {
244 systemd_field_display_value(context, scope, field, value, false)
245 }
246}
247
248impl NetdataFunctionProfile for SystemdJournalPluginProfile {
249 fn field_display_value(
250 &self,
251 context: &DisplayContext,
252 scope: DisplayScope,
253 field: &str,
254 value: &[u8],
255 ) -> Value {
256 systemd_field_display_value(context, scope, field, value, true)
257 }
258}
259
260#[derive(Debug, Clone)]
261pub struct NetdataJournalFunction<P = SystemdJournalProfile> {
262 config: NetdataFunctionConfig,
263 profile: P,
264}
265
266#[derive(Debug, Clone)]
267pub struct NetdataFunctionProgress {
268 pub current_file: usize,
269 pub total_files: usize,
270 pub matched_files: u64,
271 pub skipped_files: u64,
272 pub stats: ExplorerStats,
273 pub elapsed: Duration,
274}
275
276#[derive(Debug, Clone, Default, PartialEq, Eq)]
277pub struct NetdataJournalFileMetadata {
278 pub source_type: Option<u64>,
279 pub source_name: Option<String>,
280 pub file_last_modified_usec: Option<u64>,
281 pub msg_first_realtime_usec: Option<u64>,
282 pub msg_last_realtime_usec: Option<u64>,
283 pub journal_vs_realtime_delta_usec: Option<u64>,
284}
285
286pub trait NetdataFunctionState {
287 fn file_metadata(&self, _path: &Path) -> Option<NetdataJournalFileMetadata> {
288 None
289 }
290
291 fn update_file_journal_vs_realtime_delta_usec(&mut self, _path: &Path, _delta_usec: u64) {}
292}
293
294pub struct NetdataFunctionRunOptions<'a> {
295 pub timeout: Option<Duration>,
296 pub progress_callback: Option<&'a mut dyn FnMut(NetdataFunctionProgress)>,
297 pub cancellation_callback: Option<&'a dyn Fn() -> bool>,
298 pub state: Option<&'a mut dyn NetdataFunctionState>,
299 pub progress_interval: Duration,
300}
301
302impl NetdataFunctionRunOptions<'_> {
303 pub fn from_timeout_seconds(seconds: u64) -> Self {
304 let seconds = if seconds == 0 {
305 EFFECTIVELY_DISABLED_TIMEOUT_SECONDS
306 } else {
307 seconds
308 };
309 Self {
310 timeout: Some(Duration::from_secs(seconds)),
311 progress_callback: None,
312 cancellation_callback: None,
313 state: None,
314 progress_interval: Duration::from_millis(250),
315 }
316 }
317}
318
319impl Default for NetdataFunctionRunOptions<'_> {
320 fn default() -> Self {
321 Self {
322 timeout: Some(Duration::from_secs(EFFECTIVELY_DISABLED_TIMEOUT_SECONDS)),
323 progress_callback: None,
324 cancellation_callback: None,
325 state: None,
326 progress_interval: Duration::from_millis(250),
327 }
328 }
329}
330
331impl NetdataJournalFunction<SystemdJournalProfile> {
332 pub fn systemd_journal() -> Self {
333 Self {
334 config: NetdataFunctionConfig::systemd_journal(),
335 profile: SystemdJournalProfile,
336 }
337 }
338}
339
340impl NetdataJournalFunction<SystemdJournalPluginProfile> {
341 pub fn systemd_journal_plugin_compatible() -> Self {
342 Self {
343 config: NetdataFunctionConfig::systemd_journal(),
344 profile: SystemdJournalPluginProfile,
345 }
346 }
347}
348
349impl<P> NetdataJournalFunction<P>
350where
351 P: NetdataFunctionProfile,
352{
353 pub fn new(config: NetdataFunctionConfig, profile: P) -> Self {
354 Self { config, profile }
355 }
356
357 pub fn run_directory_request_json(&self, directory: &Path, request: &Value) -> Result<Value> {
358 self.run_directory_request_json_with_options(
359 directory,
360 request,
361 NetdataFunctionRunOptions::default(),
362 )
363 }
364
365 pub fn run_directory_request_json_with_options(
366 &self,
367 directory: &Path,
368 request: &Value,
369 mut options: NetdataFunctionRunOptions<'_>,
370 ) -> Result<Value> {
371 let request = NetdataRequest::parse(request, &self.config)?;
372 let collection = collect_journal_files(directory)?;
373 let paths = collection.files;
374 if request.info {
375 return Ok(self.info_response(request.echo, &paths, &options));
376 }
377 let annotation_paths = paths.clone();
378
379 let selected =
380 select_journal_files_for_request(paths, &request, self.config.reader_options, &options);
381 if let Some(response) = not_modified_before_scan_response(&request, &selected) {
382 return Ok(response);
383 }
384 let selected_files = selected.files;
385 let deadline = options.timeout.map(|timeout| Instant::now() + timeout);
386 let mut combined = self.explore_files(&selected_files, &request, deadline, &mut options)?;
387 self.finalize_combined_result(
388 &request,
389 &selected_files,
390 deadline,
391 &mut options,
392 &mut combined,
393 collection.skipped,
394 collection.errors,
395 )?;
396 Ok(self.query_response(request, &annotation_paths, combined))
397 }
398
399 fn finalize_combined_result(
400 &self,
401 request: &NetdataRequest,
402 selected_files: &[SelectedJournalFile],
403 deadline: Option<Instant>,
404 options: &mut NetdataFunctionRunOptions<'_>,
405 combined: &mut CombinedResult,
406 skipped_files: u64,
407 file_errors: Vec<String>,
408 ) -> Result<()> {
409 combined.skipped_files = combined.skipped_files.saturating_add(skipped_files);
410 combined.file_errors.extend(file_errors);
411 if combined.cancelled {
412 return Ok(());
413 }
414 if !request.data_only {
415 combined.add_zero_count_facet_values_from_files(
416 &request.facets,
417 self.config.reader_options,
418 );
419 combined.add_zero_count_selected_filter_values(request);
420 }
421 if should_collect_unfiltered_facet_vocabulary(request, combined) {
422 let vocabulary = self.explore_files(
423 selected_files,
424 &request.unfiltered_vocabulary(),
425 deadline,
426 options,
427 )?;
428 combined.add_zero_count_facet_values(&vocabulary.facets);
429 }
430 Ok(())
431 }
432
433 pub fn run_directory_request_bytes(&self, directory: &Path, request: &[u8]) -> Result<Value> {
434 self.run_directory_request_bytes_with_options(
435 directory,
436 request,
437 NetdataFunctionRunOptions::default(),
438 )
439 }
440
441 pub fn run_directory_request_bytes_with_options(
442 &self,
443 directory: &Path,
444 request: &[u8],
445 options: NetdataFunctionRunOptions<'_>,
446 ) -> Result<Value> {
447 let request: Value = serde_json::from_slice(request).map_err(|err| {
448 SdkError::InvalidPath(format!("invalid Netdata function JSON: {err}"))
449 })?;
450 self.run_directory_request_json_with_options(directory, &request, options)
451 }
452
453 fn explore_files(
454 &self,
455 files: &[SelectedJournalFile],
456 request: &NetdataRequest,
457 deadline: Option<Instant>,
458 options: &mut NetdataFunctionRunOptions<'_>,
459 ) -> Result<CombinedResult> {
460 let query = request.to_explorer_query(
461 files.len() as u64,
462 None,
463 NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
464 );
465 let mut combined = CombinedResult::default();
466 let page_window = RefCell::new(NetdataPageWindow::for_request(request));
467 combined.sampling_enabled = query.sampling.is_some();
468 let mut sampling_state =
469 ExplorerSamplingState::for_query(&query, histogram_bucket_count_for_query(&query));
470 let realtime_adjuster = RefCell::new(NetdataRealtimeAdjuster::new(request.direction));
471 let started = Instant::now();
472 let total_files = files.len();
473 for (file_index, file) in files.iter().enumerate() {
474 let path = &file.path;
475 if should_stop_before_file(&mut combined, deadline, options) {
476 break;
477 }
478 let Some(mut reader) = self.open_file_for_explore(
479 path,
480 &mut combined,
481 options,
482 progress_context(file_index, total_files, started),
483 )?
484 else {
485 continue;
486 };
487 combined.matched_files = combined.matched_files.saturating_add(1);
488 combined.matched_paths.push(path.clone());
489 let query = request.file_query(files.len(), reader.header(), &file.order);
490 collect_column_fields_for_file(&mut reader, request, path, &mut combined);
491 let explored = self.explore_single_file(
492 &mut reader,
493 request,
494 &query,
495 deadline,
496 options,
497 &combined,
498 &page_window,
499 sampling_state.as_mut(),
500 &realtime_adjuster,
501 progress_context(file_index, total_files, started),
502 file.order.journal_vs_realtime_delta_usec,
503 );
504 let Some((result, stop_reason)) = record_explore_result(explored, path, &mut combined)
505 else {
506 continue;
507 };
508 if finish_explored_file(
509 options,
510 request,
511 file,
512 &query,
513 result,
514 stop_reason,
515 &mut combined,
516 files,
517 file_index,
518 progress_context(file_index, total_files, started),
519 )? {
520 break;
521 }
522 }
523 combined.expand_row_payloads(self.config.reader_options);
524 combined.page_counters = Some(page_window.into_inner().counters());
525 Ok(combined)
526 }
527
528 fn open_file_for_explore(
529 &self,
530 path: &Path,
531 combined: &mut CombinedResult,
532 options: &mut NetdataFunctionRunOptions<'_>,
533 progress: ProgressContext,
534 ) -> Result<Option<FileReader>> {
535 match FileReader::open_with_options(path, self.config.reader_options) {
536 Ok(reader) => Ok(Some(reader)),
537 Err(err) => {
538 combined.skipped_files = combined.skipped_files.saturating_add(1);
539 combined
540 .file_errors
541 .push(format!("{}: {err}", path.display()));
542 emit_progress_for_combined(options, combined, progress);
543 Ok(None)
544 }
545 }
546 }
547
548 #[allow(clippy::too_many_arguments)]
549 fn explore_single_file(
550 &self,
551 reader: &mut FileReader,
552 request: &NetdataRequest,
553 query: &ExplorerQuery,
554 deadline: Option<Instant>,
555 options: &mut NetdataFunctionRunOptions<'_>,
556 combined: &CombinedResult,
557 page_window: &RefCell<NetdataPageWindow>,
558 sampling_state: Option<&mut ExplorerSamplingState>,
559 realtime_adjuster: &RefCell<NetdataRealtimeAdjuster>,
560 progress: ProgressContext,
561 realtime_delta_usec: u64,
562 ) -> Result<(ExplorerResult, Option<ExplorerStopReason>)> {
563 let cancellation_callback = options.cancellation_callback;
564 let progress_interval = options.progress_interval;
565 let mut explorer_progress = |explorer_progress: ExplorerProgress| {
566 emit_explorer_progress(options, combined, explorer_progress, progress);
567 };
568 let mut control = ExplorerControl::new();
569 control.set_deadline(deadline);
570 control.set_cancellation_callback(cancellation_callback);
571 control.set_progress_interval(progress_interval);
572 control.set_progress_callback(Some(&mut explorer_progress));
573 control.set_sampling_state(sampling_state);
574 let mut candidate_row =
575 |realtime_usec| page_window.borrow().candidate_to_keep(realtime_usec);
576 control.set_candidate_row_callback(Some(&mut candidate_row));
577 let mut adjust_realtime =
578 |realtime_usec| realtime_adjuster.borrow_mut().adjust(realtime_usec);
579 control.set_realtime_adjust_callback(Some(&mut adjust_realtime));
580 let mut matched_row = |realtime_usec, rows_matched| {
581 delta_scan_can_stop(
582 request,
583 page_window,
584 realtime_usec,
585 rows_matched,
586 realtime_delta_usec,
587 )
588 };
589 control.set_matched_row_callback(Some(&mut matched_row));
590 let result = reader.explore_with_strategy_cursor_rows_controlled(
591 query,
592 self.config.explorer_strategy,
593 &mut control,
594 )?;
595 Ok((result, control.stop_reason()))
596 }
597
598 fn info_response(
599 &self,
600 echo: Value,
601 paths: &[PathBuf],
602 options: &NetdataFunctionRunOptions<'_>,
603 ) -> Value {
604 json!({
605 "_request": echo,
606 "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
607 "v": 3,
608 "accepted_params": self.accepted_params_from_fields(&[]),
609 "required_params": self.required_source_params(paths, options),
610 "show_ids": true,
611 "has_history": true,
612 "pagination": {
613 "enabled": true,
614 "key": "anchor",
615 "column": "timestamp",
616 "units": "timestamp_usec",
617 },
618 "status": 200,
619 "type": "table",
620 "help": "Netdata-compatible journal log function backed by the systemd journal SDK"
621 })
622 }
623
624 fn query_response(
625 &self,
626 request: NetdataRequest,
627 annotation_paths: &[PathBuf],
628 combined: CombinedResult,
629 ) -> Value {
630 let not_modified = request.if_modified_since_usec != 0
633 && !combined.partial
634 && combined.stats.rows_matched == 0;
635 if combined.cancelled {
636 return netdata_function_error(499, "Request cancelled.");
637 }
638 if not_modified {
639 return netdata_function_error(304, "No new data since the previous call.");
640 }
641 let artifacts = self.query_response_artifacts(&request, annotation_paths, &combined);
642 let mut response = base_query_response(&request, &combined, &artifacts);
643 let Some(object) = response.as_object_mut() else {
644 return netdata_function_error(500, "Internal Netdata function response error.");
645 };
646 self.add_query_response_metadata(object, &request, &combined, artifacts);
647 response
648 }
649
650 fn query_response_artifacts(
651 &self,
652 request: &NetdataRequest,
653 annotation_paths: &[PathBuf],
654 combined: &CombinedResult,
655 ) -> QueryResponseArtifacts {
656 let reportable_facet_fields = combined.reportable_facet_fields_bytes(&request.facets);
657 let reportable_facet_field_names = string_fields(&reportable_facet_fields);
658 let columns = self.build_columns(
659 &request,
660 &reportable_facet_fields,
661 &combined.rows,
662 &combined.facets,
663 &combined.column_fields,
664 );
665 let boot_ids = response_boot_ids(
666 &columns.order,
667 &combined.rows,
668 &combined.facets,
669 combined.histogram.as_ref(),
670 );
671 let context = DisplayContext {
672 boot_first_realtime: collect_boot_first_realtime(
673 annotation_paths,
674 self.config.reader_options,
675 &boot_ids,
676 ),
677 ..DisplayContext::default()
678 };
679 let data =
680 self.build_data_rows(&context, &columns.order, &combined.rows, request.direction);
681 let facets = self.build_facets(&context, &reportable_facet_fields, &combined.facets);
682 let histogram = combined.histogram.as_ref().map(|histogram| {
683 self.build_histogram(&context, histogram, combined.facets.get(&histogram.field))
684 });
685 let message = query_message(combined.timed_out, &combined.stats);
686 let items = response_items(request, combined, data.len() as u64);
687 QueryResponseArtifacts {
688 reportable_facet_field_names,
689 columns,
690 data,
691 facets,
692 histogram,
693 message,
694 items,
695 }
696 }
697
698 fn add_query_response_metadata(
699 &self,
700 object: &mut Map<String, Value>,
701 request: &NetdataRequest,
702 combined: &CombinedResult,
703 artifacts: QueryResponseArtifacts,
704 ) {
705 if !request.data_only {
706 self.add_full_query_response_metadata(object, request, combined, &artifacts);
707 } else if request.histogram.is_some() {
708 object.insert(
709 "available_histograms".to_string(),
710 self.available_histograms(request, combined),
711 );
712 }
713 add_last_modified_if_needed(object, request, combined);
714 add_sampling_if_needed(object, combined);
715 add_analysis_outputs_if_needed(object, request, artifacts);
716 }
717
718 fn add_full_query_response_metadata(
719 &self,
720 object: &mut Map<String, Value>,
721 request: &NetdataRequest,
722 combined: &CombinedResult,
723 artifacts: &QueryResponseArtifacts,
724 ) {
725 object.insert("message".to_string(), artifacts.message.clone());
726 object.insert("update_every".to_string(), Value::from(1));
727 object.insert("help".to_string(), Value::Null);
728 object.insert(
729 "accepted_params".to_string(),
730 self.accepted_params_from_fields(&artifacts.reportable_facet_field_names),
731 );
732 object.insert("default_sort_column".to_string(), Value::from("timestamp"));
733 object.insert("default_charts".to_string(), Value::Array(Vec::new()));
734 object.insert(
735 "available_histograms".to_string(),
736 self.available_histograms(request, combined),
737 );
738 }
739
740 fn build_columns(
741 &self,
742 request: &NetdataRequest,
743 reportable_facet_fields: &[Vec<u8>],
744 rows: &[LocatedRow],
745 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
746 column_fields: &BTreeSet<String>,
747 ) -> Columns {
748 let mut order = vec!["timestamp".to_string(), "rowOptions".to_string()];
749 push_unique_many(&mut order, &self.config.default_view_keys);
750 push_unique_many(&mut order, &string_fields(reportable_facet_fields));
751 if let Some(histogram) = &request.histogram {
752 push_unique(&mut order, histogram);
753 }
754 for field in column_fields {
755 push_unique(&mut order, field);
756 }
757
758 for (field, values) in facets {
759 if !facet_group_is_reportable(values) {
760 continue;
761 }
762 push_unique(&mut order, &String::from_utf8_lossy(field));
763 }
764 for row in rows {
765 let fields = row_fields(row);
766 for field in fields.keys() {
767 push_unique(&mut order, field);
768 }
769 }
770
771 let mut map = Map::new();
772 for (index, key) in order.iter().enumerate() {
773 map.insert(key.clone(), column_metadata(key, index));
774 }
775 Columns { order, map }
776 }
777
778 fn build_data_rows(
779 &self,
780 context: &DisplayContext,
781 column_order: &[String],
782 rows: &[LocatedRow],
783 direction: Direction,
784 ) -> Vec<Value> {
785 let row_iter: Box<dyn Iterator<Item = &LocatedRow> + '_> = match direction {
786 Direction::Forward => Box::new(rows.iter().rev()),
787 Direction::Backward => Box::new(rows.iter()),
788 };
789 row_iter
790 .map(|located| {
791 let fields = row_fields(located);
792 let mut row = Vec::with_capacity(column_order.len());
793 for column in column_order {
794 let value = match column.as_str() {
795 "timestamp" => Value::from(located.row.realtime_usec),
796 "rowOptions" => self.profile.row_options(&fields),
797 field => first_value(&fields, field)
798 .map(|value| {
799 self.profile.field_display_value(
800 context,
801 DisplayScope::Data,
802 field,
803 value,
804 )
805 })
806 .unwrap_or(Value::Null),
807 };
808 row.push(value);
809 }
810 Value::Array(row)
811 })
812 .collect()
813 }
814
815 fn build_facets(
816 &self,
817 context: &DisplayContext,
818 requested: &[Vec<u8>],
819 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
820 ) -> Value {
821 let mut out = Vec::new();
822 for (order, field) in requested.iter().enumerate() {
823 let values = facets.get(field);
824 let field_name = String::from_utf8_lossy(field).into_owned();
825 let mut options: Vec<_> = values
826 .into_iter()
827 .flat_map(|values| values.iter())
828 .filter(|(value, count)| {
829 (!value.is_empty() && value.as_slice() != b"-")
830 || (**count == 0 && value.is_empty())
831 })
832 .map(|(value, count)| {
833 if *count == 0 && value.is_empty() {
834 return json!({
835 "id": NETDATA_EMPTY_STRING_FACET_HASH_ID,
836 "name": NETDATA_UNAVAILABLE_FIELD_LABEL,
837 "count": count,
838 });
839 }
840 json!({
841 "id": String::from_utf8_lossy(value).into_owned(),
842 "name": self.profile.facet_option_name(context, &field_name, value),
843 "count": count,
844 })
845 })
846 .collect();
847 sort_facet_options(&field_name, &mut options);
848 for (idx, option) in options.iter_mut().enumerate() {
849 if let Some(object) = option.as_object_mut() {
850 object.insert("order".to_string(), Value::from((idx + 1) as u64));
851 }
852 }
853 out.push(json!({
854 "id": field_name,
855 "name": String::from_utf8_lossy(field).into_owned(),
856 "order": order + 1,
857 "options": options,
858 }));
859 }
860 Value::Array(out)
861 }
862
863 fn build_histogram(
864 &self,
865 context: &DisplayContext,
866 histogram: &ExplorerHistogram,
867 known_values: Option<&BTreeMap<Vec<u8>, u64>>,
868 ) -> Value {
869 let field = String::from_utf8_lossy(&histogram.field).into_owned();
870 let mut dimension_ids = BTreeSet::new();
871 let mut buckets = Vec::with_capacity(histogram.buckets.len());
872 for bucket in &histogram.buckets {
873 let mut values = BTreeMap::new();
874 for (value, count) in &bucket.values {
875 add_netdata_facet_count(&mut values, value, *count);
876 }
877 for value in values.keys() {
878 dimension_ids.insert(value.clone());
879 }
880 buckets.push((bucket.start_realtime_usec, values));
881 }
882 let actual_dimension_ids = dimension_ids.clone();
883 if let Some(known_values) = known_values {
884 for value in known_values.keys() {
885 if value.is_empty() || value.as_slice() == b"-" {
886 continue;
887 }
888 dimension_ids.insert(value.clone());
889 }
890 }
891 let dimension_ids: Vec<Vec<u8>> = dimension_ids.into_iter().collect();
892 let metadata = self.histogram_chart_metadata(
893 context,
894 &field,
895 &buckets,
896 &actual_dimension_ids,
897 &dimension_ids,
898 );
899 let data: Vec<Value> = buckets
900 .iter()
901 .map(|(start_realtime_usec, values)| {
902 let mut point = Vec::with_capacity(dimension_ids.len() + 1);
903 point.push(Value::from(start_realtime_usec / 1000));
904 for value in &dimension_ids {
905 let count = values
906 .get(value)
907 .copied()
908 .map(Value::from)
909 .unwrap_or_else(|| {
910 if actual_dimension_ids.contains(value) {
911 Value::from(0)
912 } else {
913 Value::Null
914 }
915 });
916 point.push(Value::Array(vec![count, Value::from(0), Value::from(0)]));
917 }
918 Value::Array(point)
919 })
920 .collect();
921
922 json!({
923 "id": field,
924 "name": field,
925 "chart": {
926 "summary": metadata.summary,
927 "totals": metadata.totals,
928 "result": {
929 "labels": metadata.result_labels,
930 "point": { "value": 0, "arp": 1, "pa": 2 },
931 "data": data,
932 },
933 "db": {
934 "tiers": 1,
935 "update_every": histogram_update_every_seconds(histogram),
936 "units": "events",
937 "dimensions": {
938 "ids": metadata.ids.clone(),
939 "names": metadata.names.clone(),
940 "units": metadata.units.clone(),
941 "sts": metadata.stats.clone(),
942 },
943 "per_tier": [{
944 "tier": 0,
945 "queries": 1,
946 "points": metadata.points,
947 "update_every": histogram_update_every_seconds(histogram),
948 }],
949 },
950 "view": {
951 "title": format!("Events Distribution by {}", String::from_utf8_lossy(&histogram.field)),
952 "update_every": histogram_update_every_seconds(histogram),
953 "after": histogram_after_seconds(histogram),
954 "before": histogram_before_seconds(histogram),
955 "units": "events",
956 "chart_type": "stackedBar",
957 "dimensions": {
958 "grouped_by": ["dimension"],
959 "ids": metadata.ids,
960 "names": metadata.names,
961 "colors": metadata.colors,
962 "units": metadata.units,
963 "sts": metadata.stats,
964 },
965 "min": metadata.min,
966 "max": metadata.max,
967 },
968 "agents": [{
969 "mg": "default",
970 "nm": "facets.histogram",
971 "now": now_unix_seconds(),
972 "ai": 0,
973 }],
974 }
975 })
976 }
977
978 fn histogram_chart_metadata(
979 &self,
980 context: &DisplayContext,
981 field: &str,
982 buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
983 actual_dimensions: &BTreeSet<Vec<u8>>,
984 dimensions: &[Vec<u8>],
985 ) -> HistogramChartMetadata {
986 let mut ids = Vec::with_capacity(dimensions.len());
987 let mut names = Vec::with_capacity(dimensions.len());
988 let mut colors = Vec::with_capacity(dimensions.len());
989 let mut units = Vec::with_capacity(dimensions.len());
990 let mut min_values = Vec::with_capacity(dimensions.len());
991 let mut max_values = Vec::with_capacity(dimensions.len());
992 let mut avg_values = Vec::with_capacity(dimensions.len());
993 let mut arp_values = Vec::with_capacity(dimensions.len());
994 let mut con_values = Vec::with_capacity(dimensions.len());
995 let mut summary_dimensions = Vec::with_capacity(dimensions.len());
996 let mut result_labels = vec![Value::String("time".to_string())];
997
998 let total: u64 = dimensions
999 .iter()
1000 .map(|dimension| {
1001 buckets
1002 .iter()
1003 .map(|(_, values)| values.get(dimension).copied().unwrap_or(0))
1004 .sum::<u64>()
1005 })
1006 .sum();
1007
1008 let mut min = 0;
1009 let mut max = 0;
1010 let mut points = 0;
1011 for (priority, dimension) in dimensions.iter().enumerate() {
1012 let id = String::from_utf8_lossy(dimension).into_owned();
1013 let display = match self.profile.field_display_value(
1014 context,
1015 DisplayScope::Histogram,
1016 field,
1017 dimension,
1018 ) {
1019 Value::String(value) => value,
1020 other => other.to_string(),
1021 };
1022 let (dimension_min, dimension_max, dimension_sum, actual) =
1023 histogram_dimension_stats(buckets, actual_dimensions, dimension);
1024 let dimension_average = if actual && !buckets.is_empty() {
1025 dimension_sum as f64 / buckets.len() as f64
1026 } else {
1027 0.0
1028 };
1029 if actual {
1030 if points == 0 || dimension_min < min {
1031 min = dimension_min;
1032 }
1033 if dimension_max > max {
1034 max = dimension_max;
1035 }
1036 points += buckets.len() as u64;
1037 }
1038 let contribution = if total > 0 {
1039 dimension_sum as f64 * 100.0 / total as f64
1040 } else {
1041 0.0
1042 };
1043
1044 ids.push(Value::String(id.clone()));
1045 names.push(Value::String(display.clone()));
1046 colors.push(Value::Null);
1047 units.push(Value::String("events".to_string()));
1048 result_labels.push(Value::String(display.clone()));
1049 min_values.push(Value::from(dimension_min));
1050 max_values.push(Value::from(dimension_max));
1051 avg_values.push(Value::from(dimension_average));
1052 arp_values.push(Value::from(0));
1053 con_values.push(Value::from(contribution));
1054 summary_dimensions.push(json!({
1055 "id": id,
1056 "nm": display,
1057 "ds": { "sl": bool_to_u64(actual), "qr": bool_to_u64(actual) },
1058 "sts": {
1059 "min": dimension_min,
1060 "max": dimension_max,
1061 "avg": dimension_average,
1062 "con": contribution,
1063 },
1064 "pri": priority as u64,
1065 }));
1066 }
1067
1068 let stats = json!({
1069 "min": min_values,
1070 "max": max_values,
1071 "avg": avg_values,
1072 "arp": arp_values,
1073 "con": con_values,
1074 });
1075 let summary_stats = json!({
1076 "min": min,
1077 "max": max,
1078 "avg": histogram_average(total, points),
1079 "con": 100.0,
1080 });
1081 let dimensions_len = dimensions.len() as u64;
1082
1083 HistogramChartMetadata {
1084 ids,
1085 names,
1086 colors,
1087 units,
1088 stats,
1089 summary: json!({
1090 "nodes": [histogram_summary_node(dimensions_len, points, &summary_stats)],
1091 "contexts": [histogram_summary_context(dimensions_len, points, &summary_stats)],
1092 "instances": [histogram_summary_instance(dimensions_len, points, &summary_stats)],
1093 "dimensions": summary_dimensions,
1094 "labels": [],
1095 "alerts": [],
1096 }),
1097 totals: histogram_totals(dimensions_len),
1098 result_labels,
1099 min,
1100 max,
1101 points,
1102 }
1103 }
1104
1105 fn accepted_params_from_fields(&self, fields: &[String]) -> Value {
1106 NETDATA_ACCEPTED_PARAMS
1107 .iter()
1108 .copied()
1109 .chain(fields.iter().map(String::as_str))
1110 .map(|field| Value::String(field.to_string()))
1111 .collect()
1112 }
1113
1114 fn required_source_params(
1115 &self,
1116 paths: &[PathBuf],
1117 options: &NetdataFunctionRunOptions<'_>,
1118 ) -> Value {
1119 let mut all = JournalSourceSummary::default();
1120 let mut local = JournalSourceSummary::default();
1121 let mut local_namespaces = JournalSourceSummary::default();
1122 let mut local_system = JournalSourceSummary::default();
1123 let mut local_user = JournalSourceSummary::default();
1124 let mut remote = JournalSourceSummary::default();
1125 let mut other = JournalSourceSummary::default();
1126 let mut exact = BTreeMap::<String, JournalSourceSummary>::new();
1127
1128 for path in paths {
1129 let metadata = file_metadata(options, path);
1130 let source_type = metadata
1131 .as_ref()
1132 .and_then(|metadata| metadata.source_type)
1133 .unwrap_or_else(|| journal_file_source_type(path));
1134 all.add_path(path, self.config.reader_options, metadata.as_ref());
1135 if source_type & SOURCE_TYPE_LOCAL_ALL != 0 {
1136 local.add_path(path, self.config.reader_options, metadata.as_ref());
1137 }
1138 if source_type & SOURCE_TYPE_LOCAL_NAMESPACE != 0 {
1139 local_namespaces.add_path(path, self.config.reader_options, metadata.as_ref());
1140 }
1141 if source_type & SOURCE_TYPE_LOCAL_SYSTEM != 0 {
1142 local_system.add_path(path, self.config.reader_options, metadata.as_ref());
1143 }
1144 if source_type & SOURCE_TYPE_LOCAL_USER != 0 {
1145 local_user.add_path(path, self.config.reader_options, metadata.as_ref());
1146 }
1147 if source_type & SOURCE_TYPE_REMOTE_ALL != 0 {
1148 remote.add_path(path, self.config.reader_options, metadata.as_ref());
1149 }
1150 if source_type & SOURCE_TYPE_LOCAL_OTHER != 0 {
1151 other.add_path(path, self.config.reader_options, metadata.as_ref());
1152 }
1153 let source_name = metadata
1154 .as_ref()
1155 .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1156 .or_else(|| journal_file_exact_source_name(path));
1157 if let Some(source_name) = source_name {
1158 exact.entry(source_name).or_default().add_path(
1159 path,
1160 self.config.reader_options,
1161 metadata.as_ref(),
1162 );
1163 }
1164 }
1165
1166 let mut source_options = Vec::new();
1167 push_source_option(&mut source_options, "all", &all);
1168 push_source_option(&mut source_options, "all-local-logs", &local);
1169 push_source_option(
1170 &mut source_options,
1171 "all-local-namespaces",
1172 &local_namespaces,
1173 );
1174 push_source_option(&mut source_options, "all-local-system-logs", &local_system);
1175 push_source_option(&mut source_options, "all-local-user-logs", &local_user);
1176 push_source_option(&mut source_options, "all-remote-systems", &remote);
1177 push_source_option(&mut source_options, "all-uncategorized", &other);
1178 for (name, summary) in exact {
1179 push_source_option(&mut source_options, &name, &summary);
1180 }
1181
1182 json!([{
1183 "id": "__logs_sources",
1184 "name": "Journal Sources",
1185 "help": "Select the logs source to query",
1186 "type": "multiselect",
1187 "options": source_options,
1188 }])
1189 }
1190
1191 fn available_histograms(&self, request: &NetdataRequest, combined: &CombinedResult) -> Value {
1192 let mut fields = combined.reportable_facet_fields(&request.facets);
1193 if request.data_only {
1194 if let Some(histogram) = &request.histogram {
1195 push_unique(&mut fields, histogram);
1196 }
1197 }
1198 let mut sorted = fields.clone();
1199 sorted.sort_by(|left, right| netdata_reorder_key(left).cmp(&netdata_reorder_key(right)));
1200 let order_by_field: BTreeMap<String, usize> = sorted
1201 .into_iter()
1202 .enumerate()
1203 .map(|(index, field)| (field, index + 1))
1204 .collect();
1205
1206 fields
1207 .into_iter()
1208 .map(|field| {
1209 let order = order_by_field.get(&field).copied().unwrap_or(0);
1210 json!({
1211 "id": field,
1212 "name": field,
1213 "order": order,
1214 })
1215 })
1216 .collect()
1217 }
1218}
1219
1220struct HistogramChartMetadata {
1221 ids: Vec<Value>,
1222 names: Vec<Value>,
1223 colors: Vec<Value>,
1224 units: Vec<Value>,
1225 stats: Value,
1226 summary: Value,
1227 totals: Value,
1228 result_labels: Vec<Value>,
1229 min: u64,
1230 max: u64,
1231 points: u64,
1232}
1233
1234fn histogram_dimension_stats(
1235 buckets: &[(u64, BTreeMap<Vec<u8>, u64>)],
1236 actual_dimensions: &BTreeSet<Vec<u8>>,
1237 dimension: &[u8],
1238) -> (u64, u64, u64, bool) {
1239 if !actual_dimensions.contains(dimension) {
1240 return (0, 0, 0, false);
1241 }
1242 let mut min = 0;
1243 let mut max = 0;
1244 let mut sum = 0;
1245 for (idx, (_, values)) in buckets.iter().enumerate() {
1246 let count = values.get(dimension).copied().unwrap_or(0);
1247 if idx == 0 || count < min {
1248 min = count;
1249 }
1250 if count > max {
1251 max = count;
1252 }
1253 sum += count;
1254 }
1255 (min, max, sum, true)
1256}
1257
1258fn histogram_average(total: u64, points: u64) -> f64 {
1259 if points == 0 {
1260 0.0
1261 } else {
1262 total as f64 / points as f64
1263 }
1264}
1265
1266fn histogram_summary_node(dimensions: u64, points: u64, stats: &Value) -> Value {
1267 let mut node = json!({
1268 "mg": "default",
1269 "nm": "facets.histogram",
1270 "ni": 0,
1271 "st": { "ai": 0, "code": 200, "msg": "" },
1272 });
1273 add_histogram_summary_shape(&mut node, dimensions, points, stats, true);
1274 node
1275}
1276
1277fn histogram_summary_context(dimensions: u64, points: u64, stats: &Value) -> Value {
1278 let mut context = json!({ "id": "facets.histogram" });
1279 add_histogram_summary_shape(&mut context, dimensions, points, stats, true);
1280 context
1281}
1282
1283fn histogram_summary_instance(dimensions: u64, points: u64, stats: &Value) -> Value {
1284 let mut instance = json!({ "id": "facets.histogram", "ni": 0 });
1285 add_histogram_summary_shape(&mut instance, dimensions, points, stats, false);
1286 instance
1287}
1288
1289fn add_histogram_summary_shape(
1290 object: &mut Value,
1291 dimensions: u64,
1292 points: u64,
1293 stats: &Value,
1294 include_instances: bool,
1295) {
1296 let Some(map) = object.as_object_mut() else {
1297 return;
1298 };
1299 if dimensions > 0 {
1300 if include_instances {
1301 map.insert("is".to_string(), json!({ "sl": 1, "qr": 1 }));
1302 }
1303 map.insert(
1304 "ds".to_string(),
1305 json!({ "sl": dimensions, "qr": dimensions }),
1306 );
1307 }
1308 if points > 0 {
1309 map.insert("sts".to_string(), stats.clone());
1310 }
1311}
1312
1313fn histogram_totals(dimensions: u64) -> Value {
1314 let mut totals = json!({ "nodes": { "sl": 1, "qr": 1 } });
1315 if dimensions > 0 {
1316 if let Some(map) = totals.as_object_mut() {
1317 map.insert("contexts".to_string(), json!({ "sl": 1, "qr": 1 }));
1318 map.insert("instances".to_string(), json!({ "sl": 1, "qr": 1 }));
1319 map.insert(
1320 "dimensions".to_string(),
1321 json!({ "sl": dimensions, "qr": dimensions }),
1322 );
1323 }
1324 }
1325 totals
1326}
1327
1328fn bool_to_u64(value: bool) -> u64 {
1329 if value { 1 } else { 0 }
1330}
1331
1332fn histogram_after_seconds(histogram: &ExplorerHistogram) -> u64 {
1333 histogram
1334 .buckets
1335 .first()
1336 .map(|bucket| bucket.start_realtime_usec / 1_000_000)
1337 .unwrap_or(0)
1338}
1339
1340fn histogram_before_seconds(histogram: &ExplorerHistogram) -> u64 {
1341 histogram
1342 .buckets
1343 .last()
1344 .map(|bucket| bucket.end_realtime_usec / 1_000_000)
1345 .unwrap_or(0)
1346}
1347
1348fn now_unix_seconds() -> u64 {
1349 SystemTime::now()
1350 .duration_since(UNIX_EPOCH)
1351 .unwrap_or_default()
1352 .as_secs()
1353}
1354
1355#[derive(Debug, Clone)]
1356struct NetdataRequest {
1357 info: bool,
1358 echo: Value,
1359 after_realtime_usec: Option<u64>,
1360 before_realtime_usec: Option<u64>,
1361 if_modified_since_usec: u64,
1362 anchor: ExplorerAnchor,
1363 direction: Direction,
1364 limit: usize,
1365 data_only: bool,
1366 delta: bool,
1367 tail: bool,
1368 sampling: u64,
1369 source_type: u64,
1370 exact_sources: Vec<String>,
1371 filters: Vec<ExplorerFilter>,
1372 facets: Vec<Vec<u8>>,
1373 histogram: Option<String>,
1374 fts_terms: Vec<ExplorerFtsPattern>,
1375 fts_patterns: Vec<Vec<u8>>,
1376 fts_negative_patterns: Vec<Vec<u8>>,
1377}
1378
1379impl NetdataRequest {
1380 fn parse(value: &Value, config: &NetdataFunctionConfig) -> Result<Self> {
1381 let object = value.as_object().ok_or_else(|| {
1382 SdkError::InvalidPath("Netdata function request must be a JSON object".to_string())
1383 })?;
1384 let now_seconds = unix_now_seconds();
1385 let info = get_bool(object, "info").unwrap_or(false);
1386 let after = get_i64(object, "after");
1387 let before = get_i64(object, "before");
1388 let (after_realtime_usec, before_realtime_usec) =
1389 normalize_time_window(now_seconds, after, before);
1390 let direction = request_direction(object);
1391 let if_modified_since_usec = get_u64(object, "if_modified_since").unwrap_or_default();
1392 let data_only = get_bool(object, "data_only").unwrap_or(false);
1393 let delta = request_delta(data_only, object);
1394 let tail = request_tail(data_only, if_modified_since_usec, object);
1395 let sampling = get_u64(object, "sampling").unwrap_or(DEFAULT_ITEMS_SAMPLING);
1396 let (anchor, direction) = request_anchor_and_direction(
1397 object,
1398 tail,
1399 direction,
1400 after_realtime_usec,
1401 before_realtime_usec,
1402 );
1403 let requested_limit = request_limit(object);
1404 let limit = requested_limit.max(2);
1405 let requested_facets = parse_string_array(object.get("facets"));
1406 let facets = request_facets(&requested_facets, config);
1407 let requested_histogram = request_histogram(object);
1408 let histogram = request_histogram_or_default(&requested_histogram, config);
1409 let requested_query = request_query(object);
1410 let (fts_terms, fts_patterns, fts_negative_patterns) = requested_query
1411 .as_deref()
1412 .map(parse_fts_query_patterns)
1413 .unwrap_or_default();
1414 let source_selection = parse_source_selection(object.get("selections"));
1415 let filters = parse_filters(object.get("selections"));
1416
1417 let echo_input = RequestEchoInput {
1418 info,
1419 after_realtime_usec,
1420 before_realtime_usec,
1421 if_modified_since_usec,
1422 anchor,
1423 direction,
1424 limit: requested_limit,
1425 data_only,
1426 delta,
1427 tail,
1428 sampling,
1429 source_type: source_selection.source_type,
1430 requested_facets: requested_facets.as_deref(),
1431 selections: object.get("selections"),
1432 histogram: requested_histogram.as_deref(),
1433 query: requested_query.as_deref(),
1434 };
1435 let echo = normalized_request_echo(&echo_input);
1436
1437 Ok(Self {
1438 info,
1439 echo,
1440 after_realtime_usec,
1441 before_realtime_usec,
1442 if_modified_since_usec,
1443 anchor,
1444 direction,
1445 limit,
1446 data_only,
1447 delta,
1448 tail,
1449 sampling,
1450 source_type: source_selection.source_type,
1451 exact_sources: source_selection.exact_sources,
1452 filters,
1453 facets,
1454 histogram,
1455 fts_terms,
1456 fts_patterns,
1457 fts_negative_patterns,
1458 })
1459 }
1460
1461 fn matches_source(&self, path: &Path, metadata: Option<&NetdataJournalFileMetadata>) -> bool {
1462 if self.source_type == SOURCE_TYPE_ALL && self.exact_sources.is_empty() {
1463 return true;
1464 }
1465 if self.source_type & SOURCE_TYPE_ALL != 0 {
1466 return true;
1467 }
1468 let file_source_type = metadata
1469 .and_then(|metadata| metadata.source_type)
1470 .unwrap_or_else(|| journal_file_source_type(path));
1471 if file_source_type & self.source_type != 0 {
1472 return true;
1473 }
1474 if self.exact_sources.is_empty() {
1475 return false;
1476 }
1477 let source_name = metadata
1478 .and_then(|metadata| metadata.source_name.as_deref().map(str::to_owned))
1479 .or_else(|| journal_file_exact_source_name(path));
1480 self.exact_sources
1481 .iter()
1482 .any(|source| source_name.as_deref() == Some(source.as_str()))
1483 }
1484
1485 fn to_explorer_query(
1486 &self,
1487 matched_files: u64,
1488 file_header: Option<FileHeader>,
1489 realtime_slack_usec: u64,
1490 ) -> ExplorerQuery {
1491 let analysis_enabled = !self.data_only || self.delta;
1492 let tail_anchor = self.tail && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1493 let backward_page_anchor = self.data_only
1494 && !tail_anchor
1495 && self.direction == Direction::Backward
1496 && matches!(self.anchor, ExplorerAnchor::Realtime(_));
1497 let after_realtime_usec = if tail_anchor {
1498 tail_after_realtime_bound(self.after_realtime_usec, self.anchor)
1499 } else {
1500 self.after_realtime_usec
1501 };
1502 let before_realtime_usec = if backward_page_anchor {
1503 before_realtime_bound_excluding_anchor(self.before_realtime_usec, self.anchor)
1504 } else {
1505 self.before_realtime_usec
1506 };
1507 let anchor = if tail_anchor || backward_page_anchor {
1508 ExplorerAnchor::Auto
1509 } else {
1510 self.anchor
1511 };
1512 let sampling = (analysis_enabled
1513 && self.sampling != 0
1514 && matched_files != 0
1515 && after_realtime_usec.is_some()
1516 && before_realtime_usec.is_some())
1517 .then(|| {
1518 let header = file_header.unwrap_or(FileHeader {
1519 signature: [0; 8],
1520 compatible_flags: 0,
1521 incompatible_flags: 0,
1522 state: 0,
1523 header_size: 0,
1524 n_entries: 0,
1525 head_entry_realtime: 0,
1526 tail_entry_realtime: 0,
1527 head_entry_seqnum: 0,
1528 tail_entry_seqnum: 0,
1529 tail_entry_boot_id: [0; 16],
1530 seqnum_id: [0; 16],
1531 });
1532 let messages_in_file = header
1533 .tail_entry_seqnum
1534 .checked_sub(header.head_entry_seqnum)
1535 .and_then(|span| span.checked_add(1))
1536 .filter(|_| header.head_entry_seqnum != 0 && header.tail_entry_seqnum != 0)
1537 .unwrap_or(header.n_entries);
1538 ExplorerSampling {
1539 budget: self.sampling,
1540 matched_files,
1541 file_head_realtime_usec: header.head_entry_realtime,
1542 file_tail_realtime_usec: header.tail_entry_realtime,
1543 file_head_seqnum: header.head_entry_seqnum,
1544 file_tail_seqnum: header.tail_entry_seqnum,
1545 file_entries: messages_in_file,
1546 }
1547 });
1548 ExplorerQuery {
1549 after_realtime_usec,
1550 before_realtime_usec,
1551 anchor,
1552 direction: self.direction,
1553 limit: self.limit,
1554 filters: self.filters.clone(),
1555 facets: analysis_enabled
1556 .then(|| self.facets.clone())
1557 .unwrap_or_default(),
1558 histogram: analysis_enabled
1559 .then(|| {
1560 self.histogram
1561 .as_ref()
1562 .map(|field| field.as_bytes().to_vec())
1563 })
1564 .flatten(),
1565 histogram_target_buckets: DEFAULT_HISTOGRAM_BUCKETS,
1566 fts_terms: self.fts_terms.clone(),
1567 fts_patterns: self.fts_patterns.clone(),
1568 fts_negative_patterns: self.fts_negative_patterns.clone(),
1569 field_mode: ExplorerFieldMode::FirstValue,
1570 exclude_facet_field_filters: self.distinct_filter_fields() > 1,
1571 use_source_realtime: true,
1572 realtime_slack_usec: normalize_journal_vs_realtime_delta_usec(realtime_slack_usec),
1573 stop_when_rows_full: self.data_only && !tail_anchor,
1574 stop_when_rows_full_check_every: DATA_ONLY_CHECK_EVERY_ROWS,
1575 sampling,
1576 debug_collect_column_fields_by_row_traversal: false,
1577 }
1578 }
1579
1580 fn file_query(
1581 &self,
1582 matched_files: usize,
1583 file_header: FileHeader,
1584 order: &JournalFileOrderInfo,
1585 ) -> ExplorerQuery {
1586 let mut query = self.to_explorer_query(
1587 matched_files as u64,
1588 Some(file_header),
1589 order.journal_vs_realtime_delta_usec,
1590 );
1591 if self.data_only && self.delta {
1592 query.stop_when_rows_full = false;
1593 }
1594 query
1595 }
1596
1597 fn unfiltered_vocabulary(&self) -> Self {
1598 let mut request = self.clone();
1599 request.filters.clear();
1600 request.histogram = None;
1601 request.limit = 0;
1602 request.fts_terms.clear();
1603 request.fts_patterns.clear();
1604 request.fts_negative_patterns.clear();
1605 request
1606 }
1607
1608 fn distinct_filter_fields(&self) -> usize {
1609 self.filters
1610 .iter()
1611 .map(|filter| filter.field.as_slice())
1612 .collect::<HashSet<_>>()
1613 .len()
1614 }
1615}
1616
1617#[derive(Debug, Clone)]
1618struct LocatedRow {
1619 file_path: PathBuf,
1620 row: ExplorerRow,
1621}
1622
1623#[derive(Debug)]
1624struct NetdataRealtimeAdjuster {
1625 direction: Direction,
1626 last_realtime_from: u64,
1627 last_realtime_to: u64,
1628}
1629
1630impl NetdataRealtimeAdjuster {
1631 fn new(direction: Direction) -> Self {
1632 Self {
1633 direction,
1634 last_realtime_from: 0,
1635 last_realtime_to: 0,
1636 }
1637 }
1638
1639 fn adjust(&mut self, realtime_usec: u64) -> u64 {
1640 match self.direction {
1641 Direction::Backward => {
1642 if realtime_usec >= self.last_realtime_from
1643 && realtime_usec <= self.last_realtime_to
1644 {
1645 self.last_realtime_from = self.last_realtime_from.saturating_sub(1);
1646 self.last_realtime_from
1647 } else {
1648 self.last_realtime_from = realtime_usec;
1649 self.last_realtime_to = realtime_usec;
1650 realtime_usec
1651 }
1652 }
1653 Direction::Forward => {
1654 if realtime_usec >= self.last_realtime_from
1655 && realtime_usec <= self.last_realtime_to
1656 {
1657 self.last_realtime_to = self.last_realtime_to.saturating_add(1);
1658 self.last_realtime_to
1659 } else {
1660 self.last_realtime_from = realtime_usec;
1661 self.last_realtime_to = realtime_usec;
1662 realtime_usec
1663 }
1664 }
1665 }
1666 }
1667}
1668
1669#[derive(Debug, Default)]
1670struct JournalSourceSummary {
1671 files: u64,
1672 total_size: u64,
1673 first_realtime_usec: Option<u64>,
1674 last_realtime_usec: Option<u64>,
1675}
1676
1677impl JournalSourceSummary {
1678 #[cfg(test)]
1679 fn from_paths(
1680 paths: &[PathBuf],
1681 reader_options: ReaderOptions,
1682 options: &NetdataFunctionRunOptions<'_>,
1683 ) -> Self {
1684 let mut summary = Self::default();
1685 for path in paths {
1686 let metadata = file_metadata(options, path);
1687 summary.add_path(path, reader_options, metadata.as_ref());
1688 }
1689 summary
1690 }
1691
1692 fn add_path(
1693 &mut self,
1694 path: &Path,
1695 reader_options: ReaderOptions,
1696 metadata: Option<&NetdataJournalFileMetadata>,
1697 ) {
1698 if let Ok(metadata) = std::fs::metadata(path) {
1699 self.files = self.files.saturating_add(1);
1700 self.total_size = self.total_size.saturating_add(metadata.len());
1701 }
1702 if let Some(metadata) = metadata {
1703 let metadata_first = metadata.msg_first_realtime_usec;
1704 let metadata_last = metadata.msg_last_realtime_usec;
1705 if let Some(first) = metadata_first {
1706 self.first_realtime_usec = Some(
1707 self.first_realtime_usec
1708 .map_or(first, |current| current.min(first)),
1709 );
1710 }
1711 if let Some(last) = metadata_last {
1712 self.last_realtime_usec = Some(
1713 self.last_realtime_usec
1714 .map_or(last, |current| current.max(last)),
1715 );
1716 }
1717 if metadata_first.is_some() && metadata_last.is_some() {
1718 return;
1719 }
1720 }
1721 let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
1722 return;
1723 };
1724 let header = reader.header();
1725 if header.head_entry_realtime != 0 {
1726 self.first_realtime_usec = Some(
1727 self.first_realtime_usec
1728 .map_or(header.head_entry_realtime, |current| {
1729 current.min(header.head_entry_realtime)
1730 }),
1731 );
1732 }
1733 if header.tail_entry_realtime != 0 {
1734 self.last_realtime_usec = Some(
1735 self.last_realtime_usec
1736 .map_or(header.tail_entry_realtime, |current| {
1737 current.max(header.tail_entry_realtime)
1738 }),
1739 );
1740 }
1741 }
1742
1743 fn info(&self) -> String {
1744 let coverage = match (self.first_realtime_usec, self.last_realtime_usec) {
1745 (Some(first), Some(last)) if last >= first => {
1746 human_duration_seconds((last - first) / 1_000_000)
1747 }
1748 _ => "0s".to_string(),
1749 };
1750 let last_entry = self
1751 .last_realtime_usec
1752 .and_then(|usec| DateTime::<Utc>::from_timestamp((usec / 1_000_000) as i64, 0))
1753 .map(|datetime| datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1754 .unwrap_or_else(|| "unknown".to_string());
1755 format!(
1756 "{} files, total size {}, covering {}, last entry at {}",
1757 self.files,
1758 human_binary_size(self.total_size),
1759 coverage,
1760 last_entry
1761 )
1762 }
1763}
1764
1765fn push_source_option(target: &mut Vec<Value>, id: &str, summary: &JournalSourceSummary) {
1766 if summary.files == 0 {
1767 return;
1768 }
1769 target.push(json!({
1770 "id": id,
1771 "name": id,
1772 "info": summary.info(),
1773 "pill": human_binary_size(summary.total_size),
1774 }));
1775}
1776
1777fn expand_located_row_payloads(
1778 located: &mut LocatedRow,
1779 reader_options: ReaderOptions,
1780) -> Result<()> {
1781 let mut reader = FileReader::open_with_options(&located.file_path, reader_options)?;
1782 reader.seek_cursor(&located.row.cursor)?;
1783 if !reader.test_cursor(&located.row.cursor)? {
1784 return Err(SdkError::InvalidCursor(format!(
1785 "selected row cursor is no longer available: {}",
1786 located.row.cursor
1787 )));
1788 }
1789 reader.collect_entry_payloads(&mut located.row.payloads)
1790}
1791
1792#[derive(Debug, Default)]
1793struct CombinedResult {
1794 rows: Vec<LocatedRow>,
1795 facets: BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
1796 histogram: Option<ExplorerHistogram>,
1797 column_fields: BTreeSet<String>,
1798 stats: ExplorerStats,
1799 page_counters: Option<NetdataPageCounters>,
1800 matched_files: u64,
1801 matched_paths: Vec<PathBuf>,
1802 skipped_files: u64,
1803 file_errors: Vec<String>,
1804 partial: bool,
1805 timed_out: bool,
1806 cancelled: bool,
1807 sampling_enabled: bool,
1808}
1809
1810#[derive(Clone, Copy, Debug, Default)]
1811struct NetdataPageCounters {
1812 matched: u64,
1813 before: u64,
1814 after: u64,
1815}
1816
1817#[derive(Clone, Copy)]
1818struct ProgressContext {
1819 current_file: usize,
1820 total_files: usize,
1821 started: Instant,
1822}
1823
1824#[derive(Debug)]
1825enum NetdataPageHeap {
1826 Backward(BinaryHeap<Reverse<u64>>),
1827 Forward(BinaryHeap<u64>),
1828}
1829
1830#[derive(Debug)]
1831struct NetdataPageWindow {
1832 direction: Direction,
1833 anchor_start_usec: Option<u64>,
1834 anchor_stop_usec: Option<u64>,
1835 limit: usize,
1836 heap: NetdataPageHeap,
1837 oldest_retained_usec: Option<u64>,
1838 newest_retained_usec: Option<u64>,
1839 matched: u64,
1840 skips_before: u64,
1841 skips_after: u64,
1842 shifts: u64,
1843}
1844
1845impl NetdataPageWindow {
1846 fn for_request(request: &NetdataRequest) -> Self {
1847 let (anchor_start_usec, anchor_stop_usec) = match (request.tail, request.anchor) {
1848 (true, ExplorerAnchor::Realtime(anchor)) => (None, Some(anchor)),
1849 (false, ExplorerAnchor::Realtime(anchor)) => (Some(anchor), None),
1850 _ => (None, None),
1851 };
1852 let heap = match request.direction {
1853 Direction::Backward => NetdataPageHeap::Backward(BinaryHeap::new()),
1854 Direction::Forward => NetdataPageHeap::Forward(BinaryHeap::new()),
1855 };
1856 Self {
1857 direction: request.direction,
1858 anchor_start_usec,
1859 anchor_stop_usec,
1860 limit: request.limit,
1861 heap,
1862 oldest_retained_usec: None,
1863 newest_retained_usec: None,
1864 matched: 0,
1865 skips_before: 0,
1866 skips_after: 0,
1867 shifts: 0,
1868 }
1869 }
1870
1871 fn candidate_to_keep(&self, realtime_usec: u64) -> bool {
1872 if self.limit == 0 || !self.entry_within_anchor_readonly(realtime_usec) {
1873 return false;
1874 }
1875 if self.retained_len() < self.limit {
1876 return true;
1877 }
1878 self.oldest_retained_usec
1879 .zip(self.newest_retained_usec)
1880 .is_some_and(|(oldest, newest)| realtime_usec >= oldest && realtime_usec <= newest)
1881 }
1882
1883 fn observe(&mut self, realtime_usec: u64) {
1884 if !self.entry_within_anchor(realtime_usec) || self.limit == 0 {
1885 return;
1886 }
1887 self.matched = self.matched.saturating_add(1);
1888 match (&mut self.heap, self.direction) {
1889 (NetdataPageHeap::Backward(heap), Direction::Backward) => {
1890 if heap.len() < self.limit {
1891 heap.push(Reverse(realtime_usec));
1892 self.add_retained_bound(realtime_usec);
1893 return;
1894 }
1895 let Some(Reverse(oldest)) = heap.peek().copied() else {
1896 heap.push(Reverse(realtime_usec));
1897 self.refresh_retained_bounds();
1898 return;
1899 };
1900 if realtime_usec < oldest {
1901 self.skips_after = self.skips_after.saturating_add(1);
1902 return;
1903 }
1904 heap.pop();
1905 heap.push(Reverse(realtime_usec));
1906 self.refresh_retained_bounds();
1907 self.shifts = self.shifts.saturating_add(1);
1908 }
1909 (NetdataPageHeap::Forward(heap), Direction::Forward) => {
1910 if heap.len() < self.limit {
1911 heap.push(realtime_usec);
1912 self.add_retained_bound(realtime_usec);
1913 return;
1914 }
1915 let Some(newest) = heap.peek().copied() else {
1916 heap.push(realtime_usec);
1917 self.refresh_retained_bounds();
1918 return;
1919 };
1920 if realtime_usec > newest {
1921 self.skips_before = self.skips_before.saturating_add(1);
1922 return;
1923 }
1924 heap.pop();
1925 heap.push(realtime_usec);
1926 self.refresh_retained_bounds();
1927 self.shifts = self.shifts.saturating_add(1);
1928 }
1929 _ => {}
1930 }
1931 }
1932
1933 fn retained_len(&self) -> usize {
1934 match &self.heap {
1935 NetdataPageHeap::Backward(heap) => heap.len(),
1936 NetdataPageHeap::Forward(heap) => heap.len(),
1937 }
1938 }
1939
1940 fn add_retained_bound(&mut self, realtime_usec: u64) {
1941 self.oldest_retained_usec = Some(
1942 self.oldest_retained_usec
1943 .map_or(realtime_usec, |current| current.min(realtime_usec)),
1944 );
1945 self.newest_retained_usec = Some(
1946 self.newest_retained_usec
1947 .map_or(realtime_usec, |current| current.max(realtime_usec)),
1948 );
1949 }
1950
1951 fn refresh_retained_bounds(&mut self) {
1952 let (oldest, newest) = match &self.heap {
1953 NetdataPageHeap::Backward(heap) => heap
1954 .iter()
1955 .map(|Reverse(usec)| *usec)
1956 .fold((None, None), retained_bounds_fold),
1957 NetdataPageHeap::Forward(heap) => heap
1958 .iter()
1959 .copied()
1960 .fold((None, None), retained_bounds_fold),
1961 };
1962 self.oldest_retained_usec = oldest;
1963 self.newest_retained_usec = newest;
1964 }
1965
1966 fn entry_within_anchor(&mut self, realtime_usec: u64) -> bool {
1967 match self.direction {
1968 Direction::Backward => {
1969 if self
1970 .anchor_start_usec
1971 .is_some_and(|anchor| realtime_usec >= anchor)
1972 {
1973 self.skips_before = self.skips_before.saturating_add(1);
1974 return false;
1975 }
1976 if self
1977 .anchor_stop_usec
1978 .is_some_and(|anchor| realtime_usec <= anchor)
1979 {
1980 self.skips_after = self.skips_after.saturating_add(1);
1981 return false;
1982 }
1983 }
1984 Direction::Forward => {
1985 if self
1986 .anchor_start_usec
1987 .is_some_and(|anchor| realtime_usec <= anchor)
1988 {
1989 self.skips_after = self.skips_after.saturating_add(1);
1990 return false;
1991 }
1992 if self
1993 .anchor_stop_usec
1994 .is_some_and(|anchor| realtime_usec >= anchor)
1995 {
1996 self.skips_before = self.skips_before.saturating_add(1);
1997 return false;
1998 }
1999 }
2000 }
2001 true
2002 }
2003
2004 fn entry_within_anchor_readonly(&self, realtime_usec: u64) -> bool {
2005 match self.direction {
2006 Direction::Backward => {
2007 if self
2008 .anchor_start_usec
2009 .is_some_and(|anchor| realtime_usec >= anchor)
2010 {
2011 return false;
2012 }
2013 if self
2014 .anchor_stop_usec
2015 .is_some_and(|anchor| realtime_usec <= anchor)
2016 {
2017 return false;
2018 }
2019 }
2020 Direction::Forward => {
2021 if self
2022 .anchor_start_usec
2023 .is_some_and(|anchor| realtime_usec <= anchor)
2024 {
2025 return false;
2026 }
2027 if self
2028 .anchor_stop_usec
2029 .is_some_and(|anchor| realtime_usec >= anchor)
2030 {
2031 return false;
2032 }
2033 }
2034 }
2035 true
2036 }
2037
2038 fn counters(&self) -> NetdataPageCounters {
2039 NetdataPageCounters {
2040 matched: self.matched,
2041 before: self.skips_before,
2042 after: self.skips_after.saturating_add(self.shifts),
2043 }
2044 }
2045
2046 fn can_stop_delta_file(&self, realtime_usec: u64, slack_usec: u64) -> bool {
2047 if self.limit == 0 {
2048 return false;
2049 }
2050 match (&self.heap, self.direction) {
2051 (NetdataPageHeap::Backward(heap), Direction::Backward) => {
2052 if heap.len() < self.limit {
2053 return false;
2054 }
2055 heap.peek().is_some_and(|Reverse(oldest)| {
2056 realtime_usec < oldest.saturating_sub(slack_usec)
2057 })
2058 }
2059 (NetdataPageHeap::Forward(heap), Direction::Forward) => {
2060 if heap.len() < self.limit {
2061 return false;
2062 }
2063 heap.peek()
2064 .is_some_and(|newest| realtime_usec > newest.saturating_add(slack_usec))
2065 }
2066 _ => false,
2067 }
2068 }
2069}
2070
2071fn retained_bounds_fold(
2072 (oldest, newest): (Option<u64>, Option<u64>),
2073 realtime_usec: u64,
2074) -> (Option<u64>, Option<u64>) {
2075 (
2076 Some(oldest.map_or(realtime_usec, |current| current.min(realtime_usec))),
2077 Some(newest.map_or(realtime_usec, |current| current.max(realtime_usec))),
2078 )
2079}
2080
2081impl CombinedResult {
2082 fn merge(
2083 &mut self,
2084 path: &Path,
2085 result: ExplorerResult,
2086 direction: Direction,
2087 limit: usize,
2088 ) -> Result<()> {
2089 let ExplorerResult {
2090 rows,
2091 facets,
2092 histogram,
2093 stats,
2094 column_fields,
2095 ..
2096 } = result;
2097
2098 if let Some(histogram) = histogram {
2099 merge_histogram(&mut self.histogram, histogram)?;
2100 }
2101
2102 self.merge_stats(stats);
2103 for row in rows {
2104 self.rows.push(LocatedRow {
2105 file_path: path.to_path_buf(),
2106 row,
2107 });
2108 }
2109 for field in column_fields {
2110 if let Ok(field) = String::from_utf8(field) {
2111 self.column_fields.insert(field);
2112 }
2113 }
2114 for (field, values) in facets {
2115 let target = self.facets.entry(field).or_default();
2116 for (value, count) in values {
2117 add_netdata_facet_count(target, &value, count);
2118 }
2119 }
2120 self.sort_and_limit(direction, limit);
2121 Ok(())
2122 }
2123
2124 fn add_column_fields<I>(&mut self, fields: I)
2125 where
2126 I: IntoIterator<Item = String>,
2127 {
2128 self.column_fields.extend(fields);
2129 }
2130
2131 fn sort_and_limit(&mut self, direction: Direction, limit: usize) {
2132 match direction {
2133 Direction::Forward => self.rows.sort_by_key(|row| row.row.realtime_usec),
2134 Direction::Backward => self
2135 .rows
2136 .sort_by(|left, right| right.row.realtime_usec.cmp(&left.row.realtime_usec)),
2137 }
2138 make_row_timestamps_unique(&mut self.rows, direction);
2139 if self.rows.len() > limit {
2140 self.rows.truncate(limit);
2141 }
2142 self.stats.rows_returned = self.rows.len() as u64;
2143 }
2144
2145 fn expand_row_payloads(&mut self, reader_options: ReaderOptions) {
2146 if self.rows.is_empty() {
2147 self.stats.rows_returned = 0;
2148 return;
2149 }
2150
2151 let mut rows = Vec::with_capacity(self.rows.len());
2152 for mut located in self.rows.drain(..) {
2153 if !located.row.payloads.is_empty() {
2154 rows.push(located);
2155 continue;
2156 }
2157 match expand_located_row_payloads(&mut located, reader_options) {
2158 Ok(()) => {
2159 self.stats.returned_row_expansions =
2160 self.stats.returned_row_expansions.saturating_add(1);
2161 rows.push(located);
2162 }
2163 Err(err) => {
2164 self.partial = true;
2165 self.file_errors.push(format!(
2166 "{} cursor {}: {err}",
2167 located.file_path.display(),
2168 located.row.cursor
2169 ));
2170 }
2171 }
2172 }
2173 self.rows = rows;
2174 self.stats.rows_returned = self.rows.len() as u64;
2175 }
2176
2177 fn merge_stats(&mut self, stats: ExplorerStats) {
2178 self.stats.rows_examined = self.stats.rows_examined.saturating_add(stats.rows_examined);
2179 self.stats.rows_matched = self.stats.rows_matched.saturating_add(stats.rows_matched);
2180 self.stats.facet_rows_matched = self
2181 .stats
2182 .facet_rows_matched
2183 .saturating_add(stats.facet_rows_matched);
2184 self.stats.rows_returned = self.stats.rows_returned.saturating_add(stats.rows_returned);
2185 self.stats.rows_unsampled = self
2186 .stats
2187 .rows_unsampled
2188 .saturating_add(stats.rows_unsampled);
2189 self.stats.rows_estimated = self
2190 .stats
2191 .rows_estimated
2192 .saturating_add(stats.rows_estimated);
2193 self.stats.sampling_sampled = self
2194 .stats
2195 .sampling_sampled
2196 .saturating_add(stats.sampling_sampled);
2197 self.stats.sampling_unsampled = self
2198 .stats
2199 .sampling_unsampled
2200 .saturating_add(stats.sampling_unsampled);
2201 self.stats.sampling_estimated = self
2202 .stats
2203 .sampling_estimated
2204 .saturating_add(stats.sampling_estimated);
2205 if stats.last_realtime_usec > self.stats.last_realtime_usec {
2206 self.stats.last_realtime_usec = stats.last_realtime_usec;
2207 }
2208 if stats.max_source_realtime_delta_usec > self.stats.max_source_realtime_delta_usec {
2209 self.stats.max_source_realtime_delta_usec = stats.max_source_realtime_delta_usec;
2210 }
2211 self.stats.data_refs_seen = self
2212 .stats
2213 .data_refs_seen
2214 .saturating_add(stats.data_refs_seen);
2215 self.stats.data_refs_skipped = self
2216 .stats
2217 .data_refs_skipped
2218 .saturating_add(stats.data_refs_skipped);
2219 self.stats.data_payloads_loaded = self
2220 .stats
2221 .data_payloads_loaded
2222 .saturating_add(stats.data_payloads_loaded);
2223 self.stats.data_objects_classified = self
2224 .stats
2225 .data_objects_classified
2226 .saturating_add(stats.data_objects_classified);
2227 self.stats.data_cache_hits = self
2228 .stats
2229 .data_cache_hits
2230 .saturating_add(stats.data_cache_hits);
2231 self.stats.data_cache_misses = self
2232 .stats
2233 .data_cache_misses
2234 .saturating_add(stats.data_cache_misses);
2235 self.stats.payloads_decompressed = self
2236 .stats
2237 .payloads_decompressed
2238 .saturating_add(stats.payloads_decompressed);
2239 self.stats.fts_scans = self.stats.fts_scans.saturating_add(stats.fts_scans);
2240 self.stats.facet_updates = self.stats.facet_updates.saturating_add(stats.facet_updates);
2241 self.stats.histogram_updates = self
2242 .stats
2243 .histogram_updates
2244 .saturating_add(stats.histogram_updates);
2245 self.stats.returned_row_expansions = self
2246 .stats
2247 .returned_row_expansions
2248 .saturating_add(stats.returned_row_expansions);
2249 self.stats.early_stop_opportunities = self
2250 .stats
2251 .early_stop_opportunities
2252 .saturating_add(stats.early_stop_opportunities);
2253 self.stats.early_stops = self.stats.early_stops.saturating_add(stats.early_stops);
2254 }
2255
2256 fn add_zero_count_facet_values(
2257 &mut self,
2258 vocabulary: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
2259 ) {
2260 for (field, values) in vocabulary {
2261 let target = self.facets.entry(field.clone()).or_default();
2262 for value in values.keys() {
2263 add_netdata_facet_count(target, value, 0);
2264 }
2265 }
2266 }
2267
2268 fn add_zero_count_facet_values_from_files(
2269 &mut self,
2270 fields: &[Vec<u8>],
2271 reader_options: ReaderOptions,
2272 ) {
2273 for path in &self.matched_paths {
2274 let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2275 continue;
2276 };
2277 for field in fields {
2278 let Ok(field_name) = std::str::from_utf8(field) else {
2279 continue;
2280 };
2281 let Ok(values) = reader.query_unique(field_name) else {
2282 continue;
2283 };
2284 if values.is_empty() {
2285 continue;
2286 }
2287 let target = self.facets.entry(field.clone()).or_default();
2288 for value in values {
2289 add_netdata_facet_count(target, &value, 0);
2290 }
2291 }
2292 }
2293 }
2294
2295 fn add_zero_count_selected_filter_values(&mut self, request: &NetdataRequest) {
2296 let mut report_fields: HashSet<Vec<u8>> = request.facets.iter().cloned().collect();
2297 if let Some(histogram) = &request.histogram {
2298 report_fields.insert(histogram.as_bytes().to_vec());
2299 }
2300 for filter in &request.filters {
2301 if !report_fields.contains(&filter.field) {
2302 continue;
2303 }
2304 let target = self.facets.entry(filter.field.clone()).or_default();
2305 for value in &filter.values {
2306 add_netdata_facet_count(target, value, 0);
2307 }
2308 }
2309 }
2310
2311 fn reportable_facet_fields(&self, requested: &[Vec<u8>]) -> Vec<String> {
2312 string_fields(&self.reportable_facet_fields_bytes(requested))
2313 }
2314
2315 fn reportable_facet_fields_bytes(&self, requested: &[Vec<u8>]) -> Vec<Vec<u8>> {
2316 requested
2317 .iter()
2318 .filter(|field| {
2319 self.facets
2320 .get(*field)
2321 .is_some_and(facet_group_is_reportable)
2322 })
2323 .cloned()
2324 .collect()
2325 }
2326}
2327
2328fn netdata_function_error(status: u64, message: &str) -> Value {
2329 json!({
2330 "status": status,
2331 "errorMessage": message,
2332 })
2333}
2334
2335fn query_message(timed_out: bool, stats: &ExplorerStats) -> Value {
2336 if !timed_out && stats.rows_unsampled == 0 && stats.rows_estimated == 0 {
2337 return Value::String("OK".to_string());
2338 }
2339
2340 let total = stats
2341 .rows_examined
2342 .saturating_add(stats.rows_unsampled)
2343 .saturating_add(stats.rows_estimated)
2344 .max(1);
2345 let real_percent = stats.rows_examined as f64 * 100.0 / total as f64;
2346 let unsampled_percent = stats.rows_unsampled as f64 * 100.0 / total as f64;
2347 let estimated_percent = stats.rows_estimated as f64 * 100.0 / total as f64;
2348
2349 let mut title = String::new();
2350 let mut description = String::new();
2351 let mut status = "notice";
2352 if timed_out {
2353 title.push_str("Query timed-out, incomplete data. ");
2354 description.push_str(
2355 "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ",
2356 );
2357 status = "warning";
2358 }
2359 if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2360 title.push_str(&format!("{real_percent:.2}% real data"));
2361 description.push_str(&format!(
2362 "ACTUAL DATA: The filters counters reflect {real_percent:.2}% of the data. "
2363 ));
2364 }
2365 if stats.rows_unsampled != 0 {
2366 title.push_str(&format!(", {unsampled_percent:.2}% unsampled"));
2367 description.push_str(&format!(
2368 "UNSAMPLED DATA: {unsampled_percent:.2}% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
2369 ));
2370 }
2371 if stats.rows_estimated != 0 {
2372 title.push_str(&format!(", {estimated_percent:.2}% estimated"));
2373 description.push_str(&format!(
2374 "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by {estimated_percent:.2}%. "
2375 ));
2376 }
2377
2378 json!({
2379 "title": title,
2380 "status": status,
2381 "description": description,
2382 })
2383}
2384
2385fn merged_progress_stats(completed: &ExplorerStats, current: &ExplorerStats) -> ExplorerStats {
2386 let mut stats = completed.clone();
2387 stats.rows_examined = stats.rows_examined.saturating_add(current.rows_examined);
2388 stats.rows_matched = stats.rows_matched.saturating_add(current.rows_matched);
2389 stats.facet_rows_matched = stats
2390 .facet_rows_matched
2391 .saturating_add(current.facet_rows_matched);
2392 stats.rows_returned = stats.rows_returned.saturating_add(current.rows_returned);
2393 stats.rows_unsampled = stats.rows_unsampled.saturating_add(current.rows_unsampled);
2394 stats.rows_estimated = stats.rows_estimated.saturating_add(current.rows_estimated);
2395 stats.sampling_sampled = stats
2396 .sampling_sampled
2397 .saturating_add(current.sampling_sampled);
2398 stats.sampling_unsampled = stats
2399 .sampling_unsampled
2400 .saturating_add(current.sampling_unsampled);
2401 stats.sampling_estimated = stats
2402 .sampling_estimated
2403 .saturating_add(current.sampling_estimated);
2404 if current.last_realtime_usec > stats.last_realtime_usec {
2405 stats.last_realtime_usec = current.last_realtime_usec;
2406 }
2407 if current.max_source_realtime_delta_usec > stats.max_source_realtime_delta_usec {
2408 stats.max_source_realtime_delta_usec = current.max_source_realtime_delta_usec;
2409 }
2410 stats.data_refs_seen = stats.data_refs_seen.saturating_add(current.data_refs_seen);
2411 stats.data_refs_skipped = stats
2412 .data_refs_skipped
2413 .saturating_add(current.data_refs_skipped);
2414 stats.data_payloads_loaded = stats
2415 .data_payloads_loaded
2416 .saturating_add(current.data_payloads_loaded);
2417 stats.data_objects_classified = stats
2418 .data_objects_classified
2419 .saturating_add(current.data_objects_classified);
2420 stats.data_cache_hits = stats
2421 .data_cache_hits
2422 .saturating_add(current.data_cache_hits);
2423 stats.data_cache_misses = stats
2424 .data_cache_misses
2425 .saturating_add(current.data_cache_misses);
2426 stats.payloads_decompressed = stats
2427 .payloads_decompressed
2428 .saturating_add(current.payloads_decompressed);
2429 stats.fts_scans = stats.fts_scans.saturating_add(current.fts_scans);
2430 stats.facet_updates = stats.facet_updates.saturating_add(current.facet_updates);
2431 stats.histogram_updates = stats
2432 .histogram_updates
2433 .saturating_add(current.histogram_updates);
2434 stats.returned_row_expansions = stats
2435 .returned_row_expansions
2436 .saturating_add(current.returned_row_expansions);
2437 stats.early_stop_opportunities = stats
2438 .early_stop_opportunities
2439 .saturating_add(current.early_stop_opportunities);
2440 stats.early_stops = stats.early_stops.saturating_add(current.early_stops);
2441 stats
2442}
2443
2444struct Columns {
2445 order: Vec<String>,
2446 map: Map<String, Value>,
2447}
2448
2449struct QueryResponseArtifacts {
2450 reportable_facet_field_names: Vec<String>,
2451 columns: Columns,
2452 data: Vec<Value>,
2453 facets: Value,
2454 histogram: Option<Value>,
2455 message: Value,
2456 items: Value,
2457}
2458
2459fn base_query_response(
2460 request: &NetdataRequest,
2461 combined: &CombinedResult,
2462 artifacts: &QueryResponseArtifacts,
2463) -> Value {
2464 json!({
2465 "_request": &request.echo,
2466 "versions": { "netdata_function_api": 1, "sdk": env!("CARGO_PKG_VERSION") },
2467 "_journal_files": {
2468 "matched": combined.matched_files,
2469 "skipped": combined.skipped_files,
2470 "errors": &combined.file_errors,
2471 },
2472 "status": 200,
2473 "partial": combined.partial,
2474 "type": "table",
2475 "show_ids": true,
2476 "has_history": true,
2477 "pagination": {
2478 "enabled": true,
2479 "key": "anchor",
2480 "column": "timestamp",
2481 "units": "timestamp_usec",
2482 },
2483 "columns": &artifacts.columns.map,
2484 "data": &artifacts.data,
2485 "_stats": {
2486 "sdk_explorer": &combined.stats,
2487 },
2488 "expires": if request.data_only {
2489 unix_now_seconds().saturating_add(3600)
2490 } else {
2491 0
2492 }
2493 })
2494}
2495
2496fn response_items(request: &NetdataRequest, combined: &CombinedResult, returned: u64) -> Value {
2497 let unsampled = combined.stats.rows_unsampled;
2498 let estimated = combined.stats.rows_estimated;
2499 let fallback_rows_after_returned =
2500 response_fallback_rows_after_returned(&combined.stats, returned);
2501 let page_counters = combined
2502 .page_counters
2503 .unwrap_or_else(|| NetdataPageCounters {
2504 matched: combined.stats.rows_matched,
2505 before: 0,
2506 after: fallback_rows_after_returned,
2507 });
2508 json!({
2509 "evaluated": combined.stats.rows_examined.saturating_add(unsampled).saturating_add(estimated),
2510 "matched": page_counters.matched.saturating_add(unsampled).saturating_add(estimated),
2511 "unsampled": unsampled,
2512 "estimated": estimated,
2513 "returned": returned,
2514 "max_to_return": request.limit as u64,
2515 "before": page_counters.before,
2516 "after": page_counters.after,
2517 })
2518}
2519
2520fn response_fallback_rows_after_returned(stats: &ExplorerStats, returned: u64) -> u64 {
2521 let source_rows = if stats.rows_unsampled != 0 || stats.rows_estimated != 0 {
2522 stats.rows_examined
2523 } else {
2524 stats.rows_matched
2525 };
2526 source_rows.saturating_sub(returned)
2527}
2528
2529fn add_last_modified_if_needed(
2530 object: &mut Map<String, Value>,
2531 request: &NetdataRequest,
2532 combined: &CombinedResult,
2533) {
2534 if !request.data_only || request.tail {
2535 object.insert(
2536 "last_modified".to_string(),
2537 Value::from(combined.stats.last_realtime_usec),
2538 );
2539 }
2540}
2541
2542fn add_sampling_if_needed(object: &mut Map<String, Value>, combined: &CombinedResult) {
2543 if combined.sampling_enabled {
2544 object.insert(
2545 "_sampling".to_string(),
2546 json!({
2547 "enabled": true,
2548 "sampled": combined.stats.sampling_sampled,
2549 "unsampled": combined.stats.sampling_unsampled,
2550 "estimated": combined.stats.sampling_estimated,
2551 }),
2552 );
2553 }
2554}
2555
2556fn add_analysis_outputs_if_needed(
2557 object: &mut Map<String, Value>,
2558 request: &NetdataRequest,
2559 artifacts: QueryResponseArtifacts,
2560) {
2561 if !request.data_only || request.delta {
2562 let (facets_key, histogram_key, items_key) = response_analysis_keys(request.data_only);
2563 object.insert(facets_key.to_string(), artifacts.facets);
2564 object.insert(
2565 histogram_key.to_string(),
2566 artifacts.histogram.unwrap_or(Value::Null),
2567 );
2568 object.insert(items_key.to_string(), artifacts.items);
2569 }
2570}
2571
2572fn response_analysis_keys(data_only: bool) -> (&'static str, &'static str, &'static str) {
2573 if data_only {
2574 ("facets_delta", "histogram_delta", "items_delta")
2575 } else {
2576 ("facets", "histogram", "items")
2577 }
2578}
2579
2580fn merge_histogram(
2581 target: &mut Option<ExplorerHistogram>,
2582 source: ExplorerHistogram,
2583) -> Result<()> {
2584 let Some(target) = target else {
2585 *target = Some(source);
2586 return Ok(());
2587 };
2588 if target.field != source.field
2589 || target.buckets.len() != source.buckets.len()
2590 || target
2591 .buckets
2592 .iter()
2593 .zip(source.buckets.iter())
2594 .any(|(target_bucket, source_bucket)| {
2595 target_bucket.start_realtime_usec != source_bucket.start_realtime_usec
2596 || target_bucket.end_realtime_usec != source_bucket.end_realtime_usec
2597 })
2598 {
2599 return Err(SdkError::Unsupported(
2600 "inconsistent Netdata histogram bucket shape",
2601 ));
2602 }
2603 for (index, source_bucket) in source.buckets.into_iter().enumerate() {
2604 let Some(target_bucket) = target.buckets.get_mut(index) else {
2605 return Err(SdkError::Unsupported(
2606 "inconsistent Netdata histogram bucket shape",
2607 ));
2608 };
2609 for (value, count) in source_bucket.values {
2610 *target_bucket.values.entry(value).or_default() += count;
2611 }
2612 }
2613 Ok(())
2614}
2615
2616fn facet_group_is_reportable(values: &BTreeMap<Vec<u8>, u64>) -> bool {
2617 values
2618 .iter()
2619 .any(|(value, _count)| !value.is_empty() && value.as_slice() != b"-")
2620}
2621
2622fn netdata_facet_value(value: &[u8]) -> &[u8] {
2623 if value.len() > NETDATA_FACET_MAX_VALUE_LENGTH {
2624 &value[..NETDATA_FACET_MAX_VALUE_LENGTH]
2625 } else {
2626 value
2627 }
2628}
2629
2630fn add_netdata_facet_count(target: &mut BTreeMap<Vec<u8>, u64>, value: &[u8], count: u64) {
2631 *target
2632 .entry(netdata_facet_value(value).to_vec())
2633 .or_default() += count;
2634}
2635
2636fn not_modified_before_scan_response(
2637 request: &NetdataRequest,
2638 selected: &SelectedJournalFiles,
2639) -> Option<Value> {
2640 if request.if_modified_since_usec != 0 && !selected.files_are_newer {
2641 Some(netdata_function_error(
2642 304,
2643 "No new data since the previous call.",
2644 ))
2645 } else {
2646 None
2647 }
2648}
2649
2650fn should_collect_unfiltered_facet_vocabulary(
2651 request: &NetdataRequest,
2652 combined: &CombinedResult,
2653) -> bool {
2654 !request.data_only && !combined.partial && !request.filters.is_empty()
2655}
2656
2657fn progress_context(file_index: usize, total_files: usize, started: Instant) -> ProgressContext {
2658 ProgressContext {
2659 current_file: file_index + 1,
2660 total_files,
2661 started,
2662 }
2663}
2664
2665fn emit_netdata_progress(
2666 options: &mut NetdataFunctionRunOptions<'_>,
2667 progress: NetdataFunctionProgress,
2668) {
2669 if let Some(callback) = options.progress_callback.as_deref_mut() {
2670 callback(progress);
2671 }
2672}
2673
2674fn emit_progress_for_combined(
2675 options: &mut NetdataFunctionRunOptions<'_>,
2676 combined: &CombinedResult,
2677 context: ProgressContext,
2678) {
2679 emit_netdata_progress(
2680 options,
2681 NetdataFunctionProgress {
2682 current_file: context.current_file,
2683 total_files: context.total_files,
2684 matched_files: combined.matched_files,
2685 skipped_files: combined.skipped_files,
2686 stats: combined.stats.clone(),
2687 elapsed: context.started.elapsed(),
2688 },
2689 );
2690}
2691
2692fn emit_explorer_progress(
2693 options: &mut NetdataFunctionRunOptions<'_>,
2694 combined: &CombinedResult,
2695 progress: ExplorerProgress,
2696 context: ProgressContext,
2697) {
2698 let stats = merged_progress_stats(&combined.stats, &progress.stats);
2699 if let Some(callback) = options.progress_callback.as_deref_mut() {
2700 callback(NetdataFunctionProgress {
2701 current_file: context.current_file,
2702 total_files: context.total_files,
2703 matched_files: combined.matched_files,
2704 skipped_files: combined.skipped_files,
2705 stats,
2706 elapsed: context.started.elapsed(),
2707 });
2708 }
2709}
2710
2711fn request_cancelled(options: &NetdataFunctionRunOptions<'_>) -> bool {
2712 options
2713 .cancellation_callback
2714 .is_some_and(|is_cancelled| is_cancelled())
2715}
2716
2717fn should_stop_before_file(
2718 combined: &mut CombinedResult,
2719 deadline: Option<Instant>,
2720 options: &NetdataFunctionRunOptions<'_>,
2721) -> bool {
2722 if request_cancelled(options) {
2723 combined.partial = true;
2724 combined.cancelled = true;
2725 return true;
2726 }
2727 if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
2728 combined.partial = true;
2729 combined.timed_out = true;
2730 return true;
2731 }
2732 false
2733}
2734
2735fn collect_column_fields_for_file(
2736 reader: &mut FileReader,
2737 request: &NetdataRequest,
2738 path: &Path,
2739 combined: &mut CombinedResult,
2740) {
2741 if request.data_only {
2742 return;
2743 }
2744 match reader.enumerate_fields_indexed() {
2745 Ok(fields) => combined.add_column_fields(fields),
2746 Err(err) => combined.file_errors.push(format!(
2747 "{}: FIELD index enumeration failed: {err}",
2748 path.display()
2749 )),
2750 }
2751}
2752
2753fn record_explore_result(
2754 result: Result<(ExplorerResult, Option<ExplorerStopReason>)>,
2755 path: &Path,
2756 combined: &mut CombinedResult,
2757) -> Option<(ExplorerResult, Option<ExplorerStopReason>)> {
2758 match result {
2759 Ok(result) => Some(result),
2760 Err(err) => {
2761 combined.skipped_files = combined.skipped_files.saturating_add(1);
2762 combined
2763 .file_errors
2764 .push(format!("{}: {err}", path.display()));
2765 None
2766 }
2767 }
2768}
2769
2770fn delta_scan_can_stop(
2771 request: &NetdataRequest,
2772 page_window: &RefCell<NetdataPageWindow>,
2773 realtime_usec: u64,
2774 rows_matched: u64,
2775 realtime_delta_usec: u64,
2776) -> bool {
2777 let mut page_window = page_window.borrow_mut();
2778 page_window.observe(realtime_usec);
2779 request.data_only
2780 && request.delta
2781 && rows_matched % DATA_ONLY_CHECK_EVERY_ROWS == 0
2782 && page_window.can_stop_delta_file(realtime_usec, realtime_delta_usec)
2783}
2784
2785#[allow(clippy::too_many_arguments)]
2786fn finish_explored_file(
2787 options: &mut NetdataFunctionRunOptions<'_>,
2788 request: &NetdataRequest,
2789 file: &SelectedJournalFile,
2790 query: &ExplorerQuery,
2791 result: ExplorerResult,
2792 stop_reason: Option<ExplorerStopReason>,
2793 combined: &mut CombinedResult,
2794 files: &[SelectedJournalFile],
2795 file_index: usize,
2796 progress: ProgressContext,
2797) -> Result<bool> {
2798 update_learned_realtime_delta(options, &file.path, &file.order, &result.stats);
2799 combined.merge(&file.path, result, query.direction, request.limit)?;
2800 emit_progress_for_combined(options, combined, progress);
2801 if request_cancelled(options) {
2802 combined.partial = true;
2803 combined.cancelled = true;
2804 return Ok(true);
2805 }
2806 if let Some(reason) = stop_reason {
2807 combined.partial = true;
2808 match reason {
2809 ExplorerStopReason::TimedOut => combined.timed_out = true,
2810 ExplorerStopReason::Cancelled => combined.cancelled = true,
2811 }
2812 return Ok(true);
2813 }
2814 Ok(request.data_only
2815 && !request.delta
2816 && !request.tail
2817 && combined.rows.len() >= request.limit
2818 && remaining_files_cannot_affect_data_page(combined, request, files, file_index + 1))
2819}
2820
2821fn file_metadata(
2822 options: &NetdataFunctionRunOptions<'_>,
2823 path: &Path,
2824) -> Option<NetdataJournalFileMetadata> {
2825 options
2826 .state
2827 .as_deref()
2828 .and_then(|state| state.file_metadata(path))
2829}
2830
2831fn update_learned_realtime_delta(
2832 options: &mut NetdataFunctionRunOptions<'_>,
2833 path: &Path,
2834 order: &JournalFileOrderInfo,
2835 stats: &ExplorerStats,
2836) {
2837 let learned_delta = stats.max_source_realtime_delta_usec;
2838 if learned_delta == 0 || learned_delta <= order.journal_vs_realtime_delta_usec {
2839 return;
2840 }
2841 let learned_delta = normalize_journal_vs_realtime_delta_usec(learned_delta);
2842 if learned_delta <= order.journal_vs_realtime_delta_usec {
2843 return;
2844 }
2845 if let Some(state) = options.state.as_deref_mut() {
2846 state.update_file_journal_vs_realtime_delta_usec(path, learned_delta);
2847 }
2848}
2849
2850fn normalize_journal_vs_realtime_delta_usec(delta_usec: u64) -> u64 {
2851 delta_usec
2852 .max(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC)
2853 .min(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC)
2854}
2855
2856#[cfg(test)]
2857fn file_may_overlap_request(header: crate::FileHeader, request: &NetdataRequest) -> bool {
2858 if header.tail_entry_realtime == 0 {
2859 return true;
2860 }
2861
2862 let first = header
2863 .head_entry_realtime
2864 .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2865 let last = header
2866 .tail_entry_realtime
2867 .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2868
2869 if request
2870 .after_realtime_usec
2871 .is_some_and(|after| last < after)
2872 {
2873 return false;
2874 }
2875 if request
2876 .before_realtime_usec
2877 .is_some_and(|before| first > before)
2878 {
2879 return false;
2880 }
2881
2882 true
2883}
2884
2885#[derive(Debug)]
2886struct SelectedJournalFile {
2887 path: PathBuf,
2888 order: JournalFileOrderInfo,
2889}
2890
2891#[derive(Debug, Default)]
2892struct SelectedJournalFiles {
2893 files: Vec<SelectedJournalFile>,
2894 files_are_newer: bool,
2895}
2896
2897fn select_journal_files_for_request(
2898 paths: Vec<PathBuf>,
2899 request: &NetdataRequest,
2900 reader_options: ReaderOptions,
2901 options: &NetdataFunctionRunOptions<'_>,
2902) -> SelectedJournalFiles {
2903 let mut selected = Vec::new();
2904 for path in paths {
2905 let metadata = file_metadata(options, &path);
2906 if !request.matches_source(&path, metadata.as_ref()) {
2907 continue;
2908 }
2909 let order = journal_file_order_info(&path, reader_options, metadata.as_ref());
2910 if !journal_file_order_may_overlap_request(&order, request) {
2911 continue;
2912 }
2913 selected.push(SelectedJournalFile { path, order });
2914 }
2915 selected.sort_by(|left, right| {
2916 compare_journal_file_order(&left.order, &right.order, request.direction)
2917 .then_with(|| left.path.cmp(&right.path))
2918 });
2919 let files_are_newer = selected
2920 .iter()
2921 .any(|file| file.order.msg_last_realtime_usec > request.if_modified_since_usec);
2922 SelectedJournalFiles {
2923 files: selected,
2924 files_are_newer,
2925 }
2926}
2927
2928fn remaining_files_cannot_affect_data_page(
2929 combined: &CombinedResult,
2930 request: &NetdataRequest,
2931 files: &[SelectedJournalFile],
2932 next_file_index: usize,
2933) -> bool {
2934 let Some(next) = files.get(next_file_index) else {
2935 return true;
2936 };
2937 let slack = next.order.journal_vs_realtime_delta_usec;
2938 match request.direction {
2939 Direction::Backward => {
2940 let Some(oldest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).min()
2941 else {
2942 return false;
2943 };
2944 next.order.msg_last_realtime_usec < oldest_retained.saturating_sub(slack)
2945 }
2946 Direction::Forward => {
2947 let Some(newest_retained) = combined.rows.iter().map(|row| row.row.realtime_usec).max()
2948 else {
2949 return false;
2950 };
2951 next.order.msg_first_realtime_usec > newest_retained.saturating_add(slack)
2952 }
2953 }
2954}
2955
2956fn journal_file_order_may_overlap_request(
2957 info: &JournalFileOrderInfo,
2958 request: &NetdataRequest,
2959) -> bool {
2960 if info.msg_last_realtime_usec == 0 {
2961 return true;
2962 }
2963
2964 let first = info
2965 .msg_first_realtime_usec
2966 .saturating_sub(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2967 let last = info
2968 .msg_last_realtime_usec
2969 .saturating_add(NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC);
2970
2971 if request
2972 .after_realtime_usec
2973 .is_some_and(|after| last < after)
2974 {
2975 return false;
2976 }
2977 if request
2978 .before_realtime_usec
2979 .is_some_and(|before| first > before)
2980 {
2981 return false;
2982 }
2983
2984 true
2985}
2986
2987fn collect_boot_first_realtime(
2988 paths: &[PathBuf],
2989 reader_options: ReaderOptions,
2990 needed_boot_ids: &BTreeSet<Vec<u8>>,
2991) -> BTreeMap<Vec<u8>, u64> {
2992 let mut out = BTreeMap::new();
2993 if needed_boot_ids.is_empty() {
2994 return out;
2995 }
2996 for path in paths {
2997 let Ok(mut reader) = FileReader::open_with_options(path, reader_options) else {
2998 continue;
2999 };
3000 let Ok(boot_ids) = reader.query_unique("_BOOT_ID") else {
3001 continue;
3002 };
3003 for boot_id in boot_ids {
3004 if !needed_boot_ids.contains(&boot_id) {
3005 continue;
3006 }
3007 let mut match_payload = b"_BOOT_ID=".to_vec();
3008 match_payload.extend_from_slice(&boot_id);
3009 reader.flush_matches();
3010 reader.add_match(&match_payload);
3011 reader.seek_head();
3012 if !reader.next().unwrap_or(false) {
3013 continue;
3014 }
3015 if let Ok(realtime) = reader.get_realtime_usec() {
3016 record_boot_first_realtime(&mut out, boot_id, realtime);
3017 }
3018 }
3019 reader.flush_matches();
3020 }
3021 out
3022}
3023
3024fn response_boot_ids(
3025 column_order: &[String],
3026 rows: &[LocatedRow],
3027 facets: &BTreeMap<Vec<u8>, BTreeMap<Vec<u8>, u64>>,
3028 histogram: Option<&ExplorerHistogram>,
3029) -> BTreeSet<Vec<u8>> {
3030 let mut boot_ids = BTreeSet::new();
3031 let row_needs_boot_id = column_order.iter().any(|field| field == "_BOOT_ID");
3032 if row_needs_boot_id {
3033 for row in rows {
3034 if let Some(values) = row_fields(row).get("_BOOT_ID") {
3035 boot_ids.extend(values.iter().cloned());
3036 }
3037 }
3038 }
3039 if let Some(values) = facets.get(b"_BOOT_ID".as_slice()) {
3040 boot_ids.extend(
3041 values
3042 .keys()
3043 .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3044 .cloned(),
3045 );
3046 }
3047 if let Some(histogram) = histogram.filter(|histogram| histogram.field == b"_BOOT_ID") {
3048 for bucket in &histogram.buckets {
3049 boot_ids.extend(
3050 bucket
3051 .values
3052 .keys()
3053 .filter(|value| !value.is_empty() && value.as_slice() != b"-")
3054 .cloned(),
3055 );
3056 }
3057 }
3058 boot_ids
3059}
3060
3061fn record_boot_first_realtime(
3062 target: &mut BTreeMap<Vec<u8>, u64>,
3063 boot_id: Vec<u8>,
3064 realtime_usec: u64,
3065) {
3066 let existing = target.entry(boot_id).or_insert(realtime_usec);
3067 if realtime_usec < *existing {
3068 *existing = realtime_usec;
3069 }
3070}
3071
3072fn row_fields(row: &LocatedRow) -> BTreeMap<String, Vec<Vec<u8>>> {
3073 let mut fields = BTreeMap::new();
3074 for payload in &row.row.payloads {
3075 let Some((field, value)) = split_payload(payload) else {
3076 continue;
3077 };
3078 fields
3079 .entry(String::from_utf8_lossy(field).into_owned())
3080 .or_insert_with(Vec::new)
3081 .push(value.to_vec());
3082 }
3083 fields.insert(
3084 "ND_JOURNAL_FILE".to_string(),
3085 vec![row.file_path.display().to_string().into_bytes()],
3086 );
3087 if !fields.contains_key("ND_JOURNAL_PROCESS") {
3088 let process = dynamic_process_name(&fields);
3089 if !process.is_empty() {
3090 fields.insert("ND_JOURNAL_PROCESS".to_string(), vec![process.into_bytes()]);
3091 }
3092 }
3093 fields
3094}
3095
3096fn dynamic_process_name(fields: &BTreeMap<String, Vec<Vec<u8>>>) -> String {
3097 let base = first_value(fields, "CONTAINER_NAME")
3098 .or_else(|| first_value(fields, "SYSLOG_IDENTIFIER"))
3099 .or_else(|| first_value(fields, "_COMM"))
3100 .map(|value| String::from_utf8_lossy(value).into_owned())
3101 .unwrap_or_default();
3102 if base.is_empty() {
3103 return "-".to_string();
3104 }
3105 let pid = first_value(fields, "_PID").map(|value| String::from_utf8_lossy(value).into_owned());
3106 match pid {
3107 Some(pid) if !pid.is_empty() => format!("{base}[{pid}]"),
3108 _ => base,
3109 }
3110}
3111
3112fn make_row_timestamps_unique(rows: &mut [LocatedRow], direction: Direction) {
3113 let mut last_from = 0u64;
3114 let mut last_to = 0u64;
3115 let mut initialized = false;
3116 for row in rows {
3117 let timestamp = row.row.realtime_usec;
3118 if initialized && timestamp >= last_from && timestamp <= last_to {
3119 match direction {
3120 Direction::Backward => {
3121 last_from = last_from.saturating_sub(1);
3122 row.row.realtime_usec = last_from;
3123 }
3124 Direction::Forward => {
3125 last_to = last_to.saturating_add(1);
3126 row.row.realtime_usec = last_to;
3127 }
3128 }
3129 } else {
3130 last_from = timestamp;
3131 last_to = timestamp;
3132 initialized = true;
3133 }
3134 }
3135}
3136
3137fn first_value<'a>(fields: &'a BTreeMap<String, Vec<Vec<u8>>>, field: &str) -> Option<&'a [u8]> {
3138 fields
3139 .get(field)
3140 .and_then(|values| values.first())
3141 .map(Vec::as_slice)
3142}
3143
3144fn split_payload(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3145 let split = payload.iter().position(|byte| *byte == b'=')?;
3146 Some((&payload[..split], &payload[split + 1..]))
3147}
3148
3149fn column_metadata(key: &str, index: usize) -> Value {
3150 let (visible, filter, full_width) = match key {
3151 "timestamp" => (true, "range", false),
3152 "rowOptions" => (false, "none", false),
3153 "_HOSTNAME" => (true, "facet", false),
3154 "ND_JOURNAL_PROCESS" | "MESSAGE" => (true, "none", key == "MESSAGE"),
3155 "ND_JOURNAL_FILE" | "_SOURCE_REALTIME_TIMESTAMP" => (false, "none", false),
3156 _ if systemd_column_is_facet(key) => (false, "facet", false),
3157 _ => (false, "none", false),
3158 };
3159 let column_type = if key == "timestamp" {
3160 "timestamp"
3161 } else if key == "rowOptions" {
3162 "none"
3163 } else {
3164 "string"
3165 };
3166 let visualization = if key == "rowOptions" {
3167 "rowOptions"
3168 } else {
3169 "value"
3170 };
3171 let mut metadata = json!({
3172 "index": index,
3173 "unique_key": key == "timestamp",
3174 "name": if key == "timestamp" { "Timestamp" } else { key },
3175 "visible": visible,
3176 "type": column_type,
3177 "visualization": visualization,
3178 "value_options": {
3179 "transform": if key == "timestamp" { "datetime_usec" } else { "none" },
3180 "decimal_points": 0,
3181 "default_value": if key == "timestamp" || key == "rowOptions" {
3182 Value::Null
3183 } else {
3184 Value::String("-".to_string())
3185 },
3186 },
3187 "sort": "ascending",
3188 "sortable": false,
3189 "sticky": false,
3190 "summary": "count",
3191 "filter": filter,
3192 "full_width": full_width,
3193 "wrap": key != "rowOptions",
3194 "default_expanded_filter": matches!(key, "PRIORITY" | "SYSLOG_FACILITY" | "MESSAGE_ID"),
3195 });
3196 if key == "rowOptions" {
3197 if let Some(object) = metadata.as_object_mut() {
3198 object.insert("dummy".to_string(), Value::Bool(true));
3199 }
3200 }
3201 metadata
3202}
3203
3204fn systemd_column_is_facet(key: &str) -> bool {
3205 if key == "MESSAGE_ID" {
3206 return true;
3207 }
3208 if key.contains("MESSAGE") || key.contains("TIMESTAMP") || key.starts_with("__") {
3209 return false;
3210 }
3211 true
3212}
3213
3214fn sort_facet_options(field: &str, options: &mut [Value]) {
3215 options.sort_by(|left, right| {
3216 let left_id = left.get("id").and_then(Value::as_str).unwrap_or_default();
3217 let right_id = right.get("id").and_then(Value::as_str).unwrap_or_default();
3218 if field == "PRIORITY" {
3219 return parse_priority(left_id).cmp(&parse_priority(right_id));
3220 }
3221 let left_count = left
3222 .get("count")
3223 .and_then(Value::as_u64)
3224 .unwrap_or_default();
3225 let right_count = right
3226 .get("count")
3227 .and_then(Value::as_u64)
3228 .unwrap_or_default();
3229 right_count
3230 .cmp(&left_count)
3231 .then_with(|| left_id.cmp(right_id))
3232 });
3233}
3234
3235fn parse_fts_query_patterns(query: &str) -> (Vec<ExplorerFtsPattern>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
3236 let bytes = query.as_bytes();
3237 let mut index = 0usize;
3238 let mut terms = Vec::new();
3239 let mut positives = Vec::new();
3240 let mut negatives = Vec::new();
3241
3242 while let Some((pattern, negative)) = next_fts_pattern(bytes, &mut index) {
3243 push_fts_pattern(
3244 pattern,
3245 negative,
3246 &mut terms,
3247 &mut positives,
3248 &mut negatives,
3249 );
3250 }
3251
3252 (terms, positives, negatives)
3253}
3254
3255fn next_fts_pattern(bytes: &[u8], index: &mut usize) -> Option<(Vec<u8>, bool)> {
3256 while *index < bytes.len() {
3257 skip_fts_separators(bytes, index);
3258 let negative = consume_fts_negative_marker(bytes, index);
3259 let pattern = read_fts_pattern(bytes, index);
3260 if !pattern.is_empty() {
3261 return Some((pattern, negative));
3262 }
3263 }
3264 None
3265}
3266
3267fn skip_fts_separators(bytes: &[u8], index: &mut usize) {
3268 while *index < bytes.len() && bytes[*index] == b'|' {
3269 *index += 1;
3270 }
3271}
3272
3273fn consume_fts_negative_marker(bytes: &[u8], index: &mut usize) -> bool {
3274 if bytes.get(*index) == Some(&b'!') {
3275 *index += 1;
3276 true
3277 } else {
3278 false
3279 }
3280}
3281
3282fn read_fts_pattern(bytes: &[u8], index: &mut usize) -> Vec<u8> {
3283 let mut pattern = Vec::new();
3284 let mut escaped = false;
3285 while *index < bytes.len() {
3286 let byte = bytes[*index];
3287 *index += 1;
3288 if byte == b'\\' && !escaped {
3289 escaped = true;
3290 continue;
3291 }
3292 if byte == b'|' && !escaped {
3293 break;
3294 }
3295 pattern.push(byte);
3296 escaped = false;
3297 }
3298 pattern
3299}
3300
3301fn push_fts_pattern(
3302 pattern: Vec<u8>,
3303 negative: bool,
3304 terms: &mut Vec<ExplorerFtsPattern>,
3305 positives: &mut Vec<Vec<u8>>,
3306 negatives: &mut Vec<Vec<u8>>,
3307) {
3308 terms.push(ExplorerFtsPattern::substring(pattern.clone(), negative));
3309 if negative {
3310 negatives.push(pattern);
3311 } else {
3312 positives.push(pattern);
3313 }
3314}
3315
3316fn parse_filters(value: Option<&Value>) -> Vec<ExplorerFilter> {
3317 let Some(Value::Object(selections)) = value else {
3318 return Vec::new();
3319 };
3320 let mut filters = Vec::new();
3321 for (field, values) in selections {
3322 if matches!(field.as_str(), "query" | "source" | "__logs_sources") {
3323 continue;
3324 }
3325 let Some(values) = parse_string_array(Some(values)) else {
3326 continue;
3327 };
3328 filters.push(ExplorerFilter::new(
3329 field.as_bytes().to_vec(),
3330 values
3331 .into_iter()
3332 .map(|value| normalize_filter_value(field, &value)),
3333 ));
3334 }
3335 filters
3336}
3337
3338#[derive(Debug, Clone)]
3339struct SourceSelection {
3340 source_type: u64,
3341 exact_sources: Vec<String>,
3342}
3343
3344fn parse_source_selection(value: Option<&Value>) -> SourceSelection {
3345 let mut selection = SourceSelection {
3346 source_type: SOURCE_TYPE_ALL,
3347 exact_sources: Vec::new(),
3348 };
3349 let Some(Value::Object(selections)) = value else {
3350 return selection;
3351 };
3352 let Some(values) = parse_string_array(selections.get("__logs_sources")) else {
3353 return selection;
3354 };
3355 selection.source_type = 0;
3356 for value in values {
3357 match source_type_for_name(&value) {
3358 Some(source_type) => selection.source_type |= source_type,
3359 None => selection.exact_sources.push(value),
3360 }
3361 }
3362 selection
3363}
3364
3365fn source_type_for_name(value: &str) -> Option<u64> {
3366 match value {
3367 "all" => Some(SOURCE_TYPE_ALL),
3368 "all-local-logs" => Some(SOURCE_TYPE_LOCAL_ALL),
3369 "all-remote-systems" => Some(SOURCE_TYPE_REMOTE_ALL),
3370 "all-local-system-logs" => Some(SOURCE_TYPE_LOCAL_SYSTEM),
3371 "all-local-user-logs" => Some(SOURCE_TYPE_LOCAL_USER),
3372 "all-local-namespaces" => Some(SOURCE_TYPE_LOCAL_NAMESPACE),
3373 "all-uncategorized" => Some(SOURCE_TYPE_LOCAL_OTHER),
3374 _ => None,
3375 }
3376}
3377
3378fn journal_file_source_type(path: &Path) -> u64 {
3379 let text = path.to_string_lossy();
3380 let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
3381 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER;
3382 };
3383 if text.contains("/remote/") {
3384 return SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL;
3385 }
3386 if local_namespace_source_name(path).is_some() {
3387 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE;
3388 }
3389 if name.starts_with("system") {
3390 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM;
3391 }
3392 if name.starts_with("user") {
3393 return SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER;
3394 }
3395 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
3396}
3397
3398fn local_namespace_source_name(path: &Path) -> Option<String> {
3399 let parent = path.parent()?.file_name()?.to_str()?;
3400 let (_, namespace) = parent.rsplit_once('.')?;
3401 (!namespace.is_empty()).then(|| format!("namespace-{namespace}"))
3402}
3403
3404fn journal_file_exact_source_name(path: &Path) -> Option<String> {
3405 let text = path.to_string_lossy();
3406 if text.contains("/remote/") {
3407 let name = path.file_name()?.to_str()?;
3408 let source = name
3409 .split_once('@')
3410 .map(|(prefix, _)| prefix)
3411 .unwrap_or_else(|| {
3412 name.strip_suffix(".journal~.zst")
3413 .or_else(|| name.strip_suffix(".journal.zst"))
3414 .or_else(|| name.strip_suffix(".journal~"))
3415 .or_else(|| name.strip_suffix(".journal"))
3416 .unwrap_or(name)
3417 });
3418 return source.starts_with("remote-").then(|| source.to_string());
3419 }
3420 local_namespace_source_name(path)
3421}
3422
3423fn normalize_filter_value(field: &str, value: &str) -> Vec<u8> {
3424 if field == "PRIORITY" {
3425 if let Some(priority) = priority_name_to_number(value) {
3426 return priority.as_bytes().to_vec();
3427 }
3428 }
3429 value.as_bytes().to_vec()
3430}
3431
3432fn parse_string_array(value: Option<&Value>) -> Option<Vec<String>> {
3433 let Value::Array(items) = value? else {
3434 return None;
3435 };
3436 Some(
3437 items
3438 .iter()
3439 .filter_map(Value::as_str)
3440 .map(ToOwned::to_owned)
3441 .collect(),
3442 )
3443}
3444
3445fn request_direction(object: &Map<String, Value>) -> Direction {
3446 match get_str(object, "direction").unwrap_or("backward") {
3447 "forward" | "forwards" | "next" => Direction::Forward,
3448 _ => Direction::Backward,
3449 }
3450}
3451
3452fn request_delta(data_only: bool, object: &Map<String, Value>) -> bool {
3453 data_only && get_bool(object, "delta").unwrap_or(false)
3454}
3455
3456fn request_tail(data_only: bool, if_modified_since_usec: u64, object: &Map<String, Value>) -> bool {
3457 data_only && if_modified_since_usec != 0 && get_bool(object, "tail").unwrap_or(false)
3458}
3459
3460fn tail_after_realtime_bound(
3461 after_realtime_usec: Option<u64>,
3462 anchor: ExplorerAnchor,
3463) -> Option<u64> {
3464 let ExplorerAnchor::Realtime(anchor) = anchor else {
3465 return after_realtime_usec;
3466 };
3467 let tail_after = anchor.saturating_add(1);
3468 Some(
3469 after_realtime_usec
3470 .map(|after| after.max(tail_after))
3471 .unwrap_or(tail_after),
3472 )
3473}
3474
3475fn before_realtime_bound_excluding_anchor(
3476 before_realtime_usec: Option<u64>,
3477 anchor: ExplorerAnchor,
3478) -> Option<u64> {
3479 let ExplorerAnchor::Realtime(anchor) = anchor else {
3480 return before_realtime_usec;
3481 };
3482 let before_anchor = anchor.saturating_sub(1);
3483 Some(
3484 before_realtime_usec
3485 .map(|before| before.min(before_anchor))
3486 .unwrap_or(before_anchor),
3487 )
3488}
3489
3490fn request_anchor_and_direction(
3491 object: &Map<String, Value>,
3492 tail: bool,
3493 direction: Direction,
3494 after_realtime_usec: Option<u64>,
3495 before_realtime_usec: Option<u64>,
3496) -> (ExplorerAnchor, Direction) {
3497 let anchor = get_u64(object, "anchor")
3498 .filter(|value| *value != 0)
3499 .map(normalize_timestamp_to_usec)
3500 .map(ExplorerAnchor::Realtime)
3501 .unwrap_or(ExplorerAnchor::Auto);
3502 if tail && matches!(anchor, ExplorerAnchor::Realtime(_)) {
3503 return (anchor, Direction::Backward);
3504 }
3505 if anchor_outside_window(anchor, after_realtime_usec, before_realtime_usec) {
3506 (ExplorerAnchor::Auto, Direction::Backward)
3507 } else {
3508 (anchor, direction)
3509 }
3510}
3511
3512fn anchor_outside_window(
3513 anchor: ExplorerAnchor,
3514 after_realtime_usec: Option<u64>,
3515 before_realtime_usec: Option<u64>,
3516) -> bool {
3517 let ExplorerAnchor::Realtime(anchor_usec) = anchor else {
3518 return false;
3519 };
3520 after_realtime_usec.is_some_and(|after| anchor_usec < after)
3521 || before_realtime_usec.is_some_and(|before| anchor_usec > before)
3522}
3523
3524fn request_limit(object: &Map<String, Value>) -> usize {
3525 get_u64(object, "last")
3526 .filter(|value| *value != 0)
3527 .map(|value| value as usize)
3528 .unwrap_or(DEFAULT_ITEMS_TO_RETURN)
3529}
3530
3531fn request_facets(
3532 requested_facets: &Option<Vec<String>>,
3533 config: &NetdataFunctionConfig,
3534) -> Vec<Vec<u8>> {
3535 requested_facets
3536 .clone()
3537 .unwrap_or_else(|| config.default_facets.clone())
3538 .into_iter()
3539 .map(Vec::from)
3540 .collect()
3541}
3542
3543fn request_histogram(object: &Map<String, Value>) -> Option<String> {
3544 get_str(object, "histogram")
3545 .filter(|histogram| !histogram.is_empty())
3546 .map(ToOwned::to_owned)
3547}
3548
3549fn request_histogram_or_default(
3550 requested_histogram: &Option<String>,
3551 config: &NetdataFunctionConfig,
3552) -> Option<String> {
3553 requested_histogram
3554 .clone()
3555 .or_else(|| config.default_histogram.clone())
3556}
3557
3558fn request_query(object: &Map<String, Value>) -> Option<String> {
3559 get_str(object, "query")
3560 .filter(|query| !query.is_empty())
3561 .map(ToOwned::to_owned)
3562}
3563
3564fn get_bool(object: &Map<String, Value>, key: &str) -> Option<bool> {
3565 object.get(key).and_then(Value::as_bool)
3566}
3567
3568fn get_i64(object: &Map<String, Value>, key: &str) -> Option<i64> {
3569 object.get(key).and_then(Value::as_i64)
3570}
3571
3572fn get_u64(object: &Map<String, Value>, key: &str) -> Option<u64> {
3573 object.get(key).and_then(Value::as_u64)
3574}
3575
3576fn get_str<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
3577 object.get(key).and_then(Value::as_str)
3578}
3579
3580fn normalize_time_window(
3581 now_seconds: i64,
3582 after: Option<i64>,
3583 before: Option<i64>,
3584) -> (Option<u64>, Option<u64>) {
3585 let mut after = after.unwrap_or(0);
3586 let mut before = before.unwrap_or(0);
3587
3588 if after == 0 && before == 0 {
3589 before = now_seconds;
3590 after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3591 } else {
3592 (after, before) = relative_window_to_absolute(now_seconds, after, before);
3593 }
3594
3595 if after > before {
3596 std::mem::swap(&mut after, &mut before);
3597 }
3598 if after == before {
3599 after = before.saturating_sub(DEFAULT_TIME_WINDOW_SECONDS);
3600 }
3601
3602 (
3603 Some(normalize_timestamp_to_usec_with_rounding(
3604 after.max(0) as u64,
3605 false,
3606 )),
3607 Some(normalize_timestamp_to_usec_with_rounding(
3608 before.max(0) as u64,
3609 true,
3610 )),
3611 )
3612}
3613
3614fn relative_window_to_absolute(now_seconds: i64, after: i64, before: i64) -> (i64, i64) {
3615 let mut after = after;
3616 let mut before = before;
3617
3618 if before.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3619 if before > 0 {
3620 before = -before;
3621 }
3622 before = now_seconds.saturating_add(before);
3623 }
3624
3625 if after.unsigned_abs() <= API_RELATIVE_TIME_MAX_SECONDS as u64 {
3626 if after > 0 {
3627 after = -after;
3628 }
3629 if after == 0 {
3630 after = -NETDATA_MISSING_AFTER_RELATIVE_SECONDS;
3631 }
3632 after = before.saturating_add(after).saturating_add(1);
3633 }
3634
3635 if after > before {
3636 std::mem::swap(&mut after, &mut before);
3637 }
3638
3639 if before > now_seconds {
3640 let delta = before.saturating_sub(now_seconds);
3641 before = before.saturating_sub(delta);
3642 after = after.saturating_sub(delta);
3643 }
3644
3645 (after, before)
3646}
3647
3648struct RequestEchoInput<'a> {
3649 info: bool,
3650 after_realtime_usec: Option<u64>,
3651 before_realtime_usec: Option<u64>,
3652 if_modified_since_usec: u64,
3653 anchor: ExplorerAnchor,
3654 direction: Direction,
3655 limit: usize,
3656 data_only: bool,
3657 delta: bool,
3658 tail: bool,
3659 sampling: u64,
3660 source_type: u64,
3661 requested_facets: Option<&'a [String]>,
3662 selections: Option<&'a Value>,
3663 histogram: Option<&'a str>,
3664 query: Option<&'a str>,
3665}
3666
3667fn normalized_request_echo(input: &RequestEchoInput<'_>) -> Value {
3668 let anchor_usec = match input.anchor {
3669 ExplorerAnchor::Realtime(usec) => usec,
3670 ExplorerAnchor::Auto | ExplorerAnchor::Head | ExplorerAnchor::Tail => 0,
3671 };
3672 let mut out = json!({
3673 "info": input.info,
3674 "slice": true,
3678 "data_only": input.data_only,
3679 "delta": input.delta,
3680 "tail": input.tail,
3681 "sampling": input.sampling,
3682 "source_type": input.source_type,
3683 "after": input.after_realtime_usec.unwrap_or(0) / 1_000_000,
3684 "before": input.before_realtime_usec.unwrap_or(0) / 1_000_000,
3685 "if_modified_since": input.if_modified_since_usec,
3686 "anchor": anchor_usec,
3687 "direction": match input.direction {
3688 Direction::Forward => "forward",
3689 Direction::Backward => "backward",
3690 },
3691 "last": input.limit,
3692 "query": input.query,
3693 "histogram": input.histogram,
3694 });
3695 if let Some(facets) = input.requested_facets {
3696 if let Some(object) = out.as_object_mut() {
3697 object.insert(
3698 "facets".to_string(),
3699 facets
3700 .iter()
3701 .map(|field| Value::String(field.clone()))
3702 .collect(),
3703 );
3704 }
3705 }
3706 if let Some(Value::Object(selections)) = input.selections {
3707 let mut selections = selections.clone();
3708 if let Some(Value::Array(sources)) = selections.get_mut("__logs_sources") {
3709 for source in sources {
3710 *source = Value::Null;
3711 }
3712 }
3713 if let Some(object) = out.as_object_mut() {
3714 object.insert("selections".to_string(), Value::Object(selections));
3715 }
3716 }
3717 out
3718}
3719
3720fn normalize_timestamp_to_usec(value: u64) -> u64 {
3721 normalize_timestamp_to_usec_with_rounding(value, false)
3722}
3723
3724fn normalize_timestamp_to_usec_with_rounding(value: u64, end_of_second: bool) -> u64 {
3725 if value >= 1_000_000_000_000 {
3726 value
3727 } else if end_of_second {
3728 value.saturating_mul(1_000_000).saturating_add(999_999)
3729 } else {
3730 value.saturating_mul(1_000_000)
3731 }
3732}
3733
3734fn unix_now_seconds() -> i64 {
3735 SystemTime::now()
3736 .duration_since(UNIX_EPOCH)
3737 .map(|duration| duration.as_secs() as i64)
3738 .unwrap_or_default()
3739}
3740
3741fn human_binary_size(bytes: u64) -> String {
3742 const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
3743 let mut value = bytes as f64;
3744 let mut unit = 0usize;
3745 while value >= 1024.0 && unit + 1 < UNITS.len() {
3746 value /= 1024.0;
3747 unit += 1;
3748 }
3749 if unit == 0 {
3750 format!("{}{}", bytes, UNITS[unit])
3751 } else if value.fract() == 0.0 {
3752 format!("{value:.0}{}", UNITS[unit])
3753 } else {
3754 let mut formatted = format!("{value:.2}");
3755 while formatted.contains('.') && formatted.ends_with('0') {
3756 formatted.pop();
3757 }
3758 if formatted.ends_with('.') {
3759 formatted.pop();
3760 }
3761 format!("{formatted}{}", UNITS[unit])
3762 }
3763}
3764
3765fn human_duration_seconds(seconds: u64) -> String {
3766 let years = seconds / (365 * 86_400);
3767 let seconds = seconds % (365 * 86_400);
3768 let months = seconds / (30 * 86_400);
3769 let seconds = seconds % (30 * 86_400);
3770 let days = seconds / 86_400;
3771 let seconds = seconds % 86_400;
3772 let hours = seconds / 3600;
3773 let minutes = (seconds % 3600) / 60;
3774 let seconds = seconds % 60;
3775 let mut parts = Vec::new();
3776 if years != 0 {
3777 parts.push(format!("{years}y"));
3778 }
3779 if months != 0 {
3780 parts.push(format!("{months}mo"));
3781 }
3782 if days != 0 {
3783 parts.push(format!("{days}d"));
3784 }
3785 if hours != 0 {
3786 parts.push(format!("{hours}h"));
3787 }
3788 if minutes != 0 {
3789 parts.push(format!("{minutes}m"));
3790 }
3791 if seconds != 0 || parts.is_empty() {
3792 parts.push(format!("{seconds}s"));
3793 }
3794 parts.join(" ")
3795}
3796
3797#[derive(Debug, Default)]
3798struct JournalFileCollection {
3799 files: Vec<PathBuf>,
3800 skipped: u64,
3801 errors: Vec<String>,
3802}
3803
3804fn collect_journal_files(path: &Path) -> Result<JournalFileCollection> {
3805 if !path.is_dir() {
3806 return Err(SdkError::InvalidPath(format!(
3807 "not a directory: {}",
3808 path.display()
3809 )));
3810 }
3811 let mut collection = JournalFileCollection::default();
3812 let mut pending = VecDeque::from([(path.to_path_buf(), 0usize)]);
3813 let mut visited = HashSet::new();
3814 while let Some((directory, depth)) = pending.pop_front() {
3815 let visited_key = std::fs::canonicalize(&directory).unwrap_or_else(|_| directory.clone());
3816 if visited.contains(&visited_key) {
3817 continue;
3818 }
3819 if visited.len() >= NETDATA_MAX_DIRECTORY_SCAN_COUNT {
3820 collection.skipped = collection.skipped.saturating_add(1);
3821 collection.errors.push(format!(
3822 "{}: directory scan limit reached",
3823 directory.display()
3824 ));
3825 continue;
3826 }
3827 visited.insert(visited_key);
3828 let entries = match std::fs::read_dir(&directory) {
3829 Ok(entries) => entries,
3830 Err(err) if directory == path => return Err(err.into()),
3831 Err(err) => {
3832 collection.skipped = collection.skipped.saturating_add(1);
3833 collection
3834 .errors
3835 .push(format!("{}: {err}", directory.display()));
3836 continue;
3837 }
3838 };
3839 for entry in entries.flatten() {
3840 let entry_path = entry.path();
3841 if entry_path.is_file() && is_journal_file_name(&entry_path) {
3842 collection.files.push(entry_path);
3843 } else if depth < NETDATA_MAX_DIRECTORY_SCAN_DEPTH && entry_path.is_dir() {
3844 pending.push_back((entry_path, depth + 1));
3845 }
3846 }
3847 }
3848 collection.files.sort();
3849 dedup_journal_files_by_canonical_path(&mut collection.files);
3850 Ok(collection)
3851}
3852
3853fn dedup_journal_files_by_canonical_path(files: &mut Vec<PathBuf>) {
3854 let mut seen = HashSet::new();
3855 files.retain(|path| {
3856 let key = std::fs::canonicalize(path).unwrap_or_else(|_| path.clone());
3857 seen.insert(key)
3858 });
3859}
3860
3861#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3862struct JournalFileOrderInfo {
3863 msg_first_realtime_usec: u64,
3864 msg_last_realtime_usec: u64,
3865 file_last_modified_usec: u64,
3866 journal_vs_realtime_delta_usec: u64,
3867}
3868
3869fn journal_file_order_info(
3870 path: &Path,
3871 reader_options: ReaderOptions,
3872 metadata: Option<&NetdataJournalFileMetadata>,
3873) -> JournalFileOrderInfo {
3874 let file_last_modified_usec = std::fs::metadata(path)
3875 .ok()
3876 .and_then(|metadata| metadata.modified().ok())
3877 .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
3878 .map(|duration| duration.as_micros().min(u128::from(u64::MAX)) as u64)
3879 .unwrap_or_default();
3880 let file_last_modified_usec = metadata
3881 .and_then(|metadata| metadata.file_last_modified_usec)
3882 .unwrap_or(file_last_modified_usec);
3883 let journal_vs_realtime_delta_usec = metadata
3884 .and_then(|metadata| metadata.journal_vs_realtime_delta_usec)
3885 .map(normalize_journal_vs_realtime_delta_usec)
3886 .unwrap_or(NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
3887
3888 let Ok(reader) = FileReader::open_with_options(path, reader_options) else {
3889 return JournalFileOrderInfo {
3890 msg_first_realtime_usec: 0,
3891 msg_last_realtime_usec: file_last_modified_usec,
3892 file_last_modified_usec,
3893 journal_vs_realtime_delta_usec,
3894 };
3895 };
3896 let header = reader.header();
3897 let msg_first_realtime_usec = metadata
3898 .and_then(|metadata| metadata.msg_first_realtime_usec)
3899 .unwrap_or(header.head_entry_realtime);
3900 let msg_last_realtime_usec = metadata
3901 .and_then(|metadata| metadata.msg_last_realtime_usec)
3902 .unwrap_or_else(|| {
3903 if header.tail_entry_realtime == 0 {
3904 file_last_modified_usec
3905 } else {
3906 header.tail_entry_realtime
3907 }
3908 });
3909 JournalFileOrderInfo {
3910 msg_first_realtime_usec,
3911 msg_last_realtime_usec,
3912 file_last_modified_usec,
3913 journal_vs_realtime_delta_usec,
3914 }
3915}
3916
3917fn compare_journal_file_order(
3918 left: &JournalFileOrderInfo,
3919 right: &JournalFileOrderInfo,
3920 direction: Direction,
3921) -> Ordering {
3922 let backward = right
3923 .msg_last_realtime_usec
3924 .cmp(&left.msg_last_realtime_usec)
3925 .then_with(|| {
3926 right
3927 .file_last_modified_usec
3928 .cmp(&left.file_last_modified_usec)
3929 })
3930 .then_with(|| {
3931 right
3932 .msg_first_realtime_usec
3933 .cmp(&left.msg_first_realtime_usec)
3934 });
3935 match direction {
3936 Direction::Backward => backward,
3937 Direction::Forward => backward.reverse(),
3938 }
3939}
3940
3941fn is_journal_file_name(path: &Path) -> bool {
3942 path.file_name()
3943 .and_then(|name| name.to_str())
3944 .is_some_and(|name| {
3945 name.ends_with(".journal")
3946 || name.ends_with(".journal~")
3947 || name.ends_with(".journal.zst")
3948 || name.ends_with(".journal~.zst")
3949 })
3950}
3951
3952fn push_unique_many(target: &mut Vec<String>, values: &[String]) {
3953 for value in values {
3954 push_unique(target, value);
3955 }
3956}
3957
3958fn string_fields(fields: &[Vec<u8>]) -> Vec<String> {
3959 fields
3960 .iter()
3961 .filter_map(|field| String::from_utf8(field.clone()).ok())
3962 .collect()
3963}
3964
3965fn push_unique(target: &mut Vec<String>, value: impl AsRef<str>) {
3966 let value = value.as_ref();
3967 if !target.iter().any(|existing| existing == value) {
3968 target.push(value.to_string());
3969 }
3970}
3971
3972fn netdata_reorder_key(value: &str) -> String {
3973 value
3974 .trim_start_matches(|character: char| character.is_ascii_punctuation())
3975 .to_ascii_lowercase()
3976}
3977
3978fn histogram_update_every_seconds(histogram: &ExplorerHistogram) -> u64 {
3979 histogram
3980 .buckets
3981 .first()
3982 .map(|bucket| {
3983 bucket
3984 .end_realtime_usec
3985 .saturating_sub(bucket.start_realtime_usec)
3986 .checked_div(1_000_000)
3987 .unwrap_or(1)
3988 .max(1)
3989 })
3990 .unwrap_or(1)
3991}
3992
3993enum TimestampPrecision {
3994 Seconds,
3995 Micros,
3996}
3997
3998fn format_realtime_usec(timestamp: u64, precision: TimestampPrecision) -> String {
3999 let seconds = (timestamp / 1_000_000) as i64;
4000 let micros = (timestamp % 1_000_000) as u32;
4001 DateTime::<Utc>::from_timestamp(seconds, micros.saturating_mul(1000))
4002 .map(|datetime| match precision {
4003 TimestampPrecision::Seconds => datetime.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
4004 TimestampPrecision::Micros => datetime.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string(),
4005 })
4006 .unwrap_or_else(|| timestamp.to_string())
4007}
4008
4009fn priority_name(raw: &str) -> Option<&'static str> {
4010 match parse_priority(raw)? {
4011 0 => Some("panic"),
4012 1 => Some("alert"),
4013 2 => Some("critical"),
4014 3 => Some("error"),
4015 4 => Some("warning"),
4016 5 => Some("notice"),
4017 6 => Some("info"),
4018 7 => Some("debug"),
4019 _ => None,
4020 }
4021}
4022
4023fn priority_name_to_number(value: &str) -> Option<&'static str> {
4024 match value {
4025 "panic" | "emergency" | "emerg" => Some("0"),
4026 "alert" => Some("1"),
4027 "critical" | "crit" => Some("2"),
4028 "error" | "err" => Some("3"),
4029 "warning" | "warn" => Some("4"),
4030 "notice" => Some("5"),
4031 "info" => Some("6"),
4032 "debug" => Some("7"),
4033 _ => None,
4034 }
4035}
4036
4037fn parse_priority(raw: &str) -> Option<u8> {
4038 raw.parse::<u8>().ok()
4039}
4040
4041fn priority_to_row_severity(raw: &[u8]) -> &'static str {
4042 let raw = String::from_utf8_lossy(raw);
4043 match parse_priority(&raw) {
4044 Some(priority) if priority <= 3 => "critical",
4045 Some(4) => "warning",
4046 Some(5) => "notice",
4047 Some(priority) if priority >= 7 => "debug",
4048 _ => "normal",
4049 }
4050}
4051
4052fn syslog_facility_name(raw: &str) -> Option<&'static str> {
4053 match raw.parse::<u8>().ok()? {
4054 0 => Some("kern"),
4055 1 => Some("user"),
4056 2 => Some("mail"),
4057 3 => Some("daemon"),
4058 4 => Some("auth"),
4059 5 => Some("syslog"),
4060 6 => Some("lpr"),
4061 7 => Some("news"),
4062 8 => Some("uucp"),
4063 9 => Some("cron"),
4064 10 => Some("authpriv"),
4065 11 => Some("ftp"),
4066 16 => Some("local0"),
4067 17 => Some("local1"),
4068 18 => Some("local2"),
4069 19 => Some("local3"),
4070 20 => Some("local4"),
4071 21 => Some("local5"),
4072 22 => Some("local6"),
4073 23 => Some("local7"),
4074 _ => None,
4075 }
4076}
4077
4078const ERRNO_NAMES: &[(u32, &str)] = &[
4079 (1, "EPERM"),
4080 (2, "ENOENT"),
4081 (3, "ESRCH"),
4082 (4, "EINTR"),
4083 (5, "EIO"),
4084 (6, "ENXIO"),
4085 (7, "E2BIG"),
4086 (8, "ENOEXEC"),
4087 (9, "EBADF"),
4088 (10, "ECHILD"),
4089 (11, "EAGAIN"),
4090 (12, "ENOMEM"),
4091 (13, "EACCES"),
4092 (14, "EFAULT"),
4093 (15, "ENOTBLK"),
4094 (16, "EBUSY"),
4095 (17, "EEXIST"),
4096 (18, "EXDEV"),
4097 (19, "ENODEV"),
4098 (20, "ENOTDIR"),
4099 (21, "EISDIR"),
4100 (22, "EINVAL"),
4101 (23, "ENFILE"),
4102 (24, "EMFILE"),
4103 (25, "ENOTTY"),
4104 (26, "ETXTBSY"),
4105 (27, "EFBIG"),
4106 (28, "ENOSPC"),
4107 (29, "ESPIPE"),
4108 (30, "EROFS"),
4109 (31, "EMLINK"),
4110 (32, "EPIPE"),
4111 (33, "EDOM"),
4112 (34, "ERANGE"),
4113 (35, "EDEADLK"),
4114 (36, "ENAMETOOLONG"),
4115 (37, "ENOLCK"),
4116 (38, "ENOSYS"),
4117 (39, "ENOTEMPTY"),
4118 (40, "ELOOP"),
4119 (42, "ENOMSG"),
4120 (43, "EIDRM"),
4121 (44, "ECHRNG"),
4122 (45, "EL2NSYNC"),
4123 (46, "EL3HLT"),
4124 (47, "EL3RST"),
4125 (48, "ELNRNG"),
4126 (49, "EUNATCH"),
4127 (50, "ENOCSI"),
4128 (51, "EL2HLT"),
4129 (52, "EBADE"),
4130 (53, "EBADR"),
4131 (54, "EXFULL"),
4132 (55, "ENOANO"),
4133 (56, "EBADRQC"),
4134 (57, "EBADSLT"),
4135 (59, "EBFONT"),
4136 (60, "ENOSTR"),
4137 (61, "ENODATA"),
4138 (62, "ETIME"),
4139 (63, "ENOSR"),
4140 (64, "ENONET"),
4141 (65, "ENOPKG"),
4142 (66, "EREMOTE"),
4143 (67, "ENOLINK"),
4144 (68, "EADV"),
4145 (69, "ESRMNT"),
4146 (70, "ECOMM"),
4147 (71, "EPROTO"),
4148 (72, "EMULTIHOP"),
4149 (73, "EDOTDOT"),
4150 (74, "EBADMSG"),
4151 (75, "EOVERFLOW"),
4152 (76, "ENOTUNIQ"),
4153 (77, "EBADFD"),
4154 (78, "EREMCHG"),
4155 (79, "ELIBACC"),
4156 (80, "ELIBBAD"),
4157 (81, "ELIBSCN"),
4158 (82, "ELIBMAX"),
4159 (83, "ELIBEXEC"),
4160 (84, "EILSEQ"),
4161 (85, "ERESTART"),
4162 (86, "ESTRPIPE"),
4163 (87, "EUSERS"),
4164 (88, "ENOTSOCK"),
4165 (89, "EDESTADDRREQ"),
4166 (90, "EMSGSIZE"),
4167 (91, "EPROTOTYPE"),
4168 (92, "ENOPROTOOPT"),
4169 (93, "EPROTONOSUPPORT"),
4170 (94, "ESOCKTNOSUPPORT"),
4171 (95, "ENOTSUP"),
4172 (96, "EPFNOSUPPORT"),
4173 (97, "EAFNOSUPPORT"),
4174 (98, "EADDRINUSE"),
4175 (99, "EADDRNOTAVAIL"),
4176 (100, "ENETDOWN"),
4177 (101, "ENETUNREACH"),
4178 (102, "ENETRESET"),
4179 (103, "ECONNABORTED"),
4180 (104, "ECONNRESET"),
4181 (105, "ENOBUFS"),
4182 (106, "EISCONN"),
4183 (107, "ENOTCONN"),
4184 (108, "ESHUTDOWN"),
4185 (109, "ETOOMANYREFS"),
4186 (110, "ETIMEDOUT"),
4187 (111, "ECONNREFUSED"),
4188 (112, "EHOSTDOWN"),
4189 (113, "EHOSTUNREACH"),
4190 (114, "EALREADY"),
4191 (115, "EINPROGRESS"),
4192 (116, "ESTALE"),
4193 (117, "EUCLEAN"),
4194 (118, "ENOTNAM"),
4195 (119, "ENAVAIL"),
4196 (120, "EISNAM"),
4197 (121, "EREMOTEIO"),
4198 (122, "EDQUOT"),
4199 (123, "ENOMEDIUM"),
4200 (124, "EMEDIUMTYPE"),
4201 (125, "ECANCELED"),
4202 (126, "ENOKEY"),
4203 (127, "EKEYEXPIRED"),
4204 (128, "EKEYREVOKED"),
4205 (129, "EKEYREJECTED"),
4206 (130, "EOWNERDEAD"),
4207 (131, "ENOTRECOVERABLE"),
4208 (132, "ERFKILL"),
4209 (133, "EHWPOISON"),
4210];
4211
4212fn errno_name(raw: &str) -> Option<String> {
4213 let errno = raw.parse::<u32>().ok()?;
4214 let name = ERRNO_NAMES
4215 .iter()
4216 .find_map(|(candidate, name)| (*candidate == errno).then_some(*name))?;
4217 Some(format!("{errno} ({name})"))
4218}
4219
4220fn cap_effective_display(raw: &str) -> String {
4221 if !raw.bytes().next().is_some_and(|byte| byte.is_ascii_digit()) {
4222 return raw.to_string();
4223 }
4224 let Ok(value) = u64::from_str_radix(raw, 16) else {
4225 return raw.to_string();
4226 };
4227 if value == 0 {
4228 return raw.to_string();
4229 }
4230 const CAPABILITIES: &[&str] = &[
4231 "CHOWN",
4232 "DAC_OVERRIDE",
4233 "DAC_READ_SEARCH",
4234 "FOWNER",
4235 "FSETID",
4236 "KILL",
4237 "SETGID",
4238 "SETUID",
4239 "SETPCAP",
4240 "LINUX_IMMUTABLE",
4241 "NET_BIND_SERVICE",
4242 "NET_BROADCAST",
4243 "NET_ADMIN",
4244 "NET_RAW",
4245 "IPC_LOCK",
4246 "IPC_OWNER",
4247 "SYS_MODULE",
4248 "SYS_RAWIO",
4249 "SYS_CHROOT",
4250 "SYS_PTRACE",
4251 "SYS_PACCT",
4252 "SYS_ADMIN",
4253 "SYS_BOOT",
4254 "SYS_NICE",
4255 "SYS_RESOURCE",
4256 "SYS_TIME",
4257 "SYS_TTY_CONFIG",
4258 "MKNOD",
4259 "LEASE",
4260 "AUDIT_WRITE",
4261 "AUDIT_CONTROL",
4262 "SETFCAP",
4263 "MAC_OVERRIDE",
4264 "MAC_ADMIN",
4265 "SYSLOG",
4266 "WAKE_ALARM",
4267 "BLOCK_SUSPEND",
4268 "AUDIT_READ",
4269 "PERFMON",
4270 "BPF",
4271 "CHECKPOINT_RESTORE",
4272 ];
4273 let names: Vec<&str> = CAPABILITIES
4274 .iter()
4275 .enumerate()
4276 .filter_map(|(index, name)| ((value & (1u64 << index)) != 0).then_some(*name))
4277 .collect();
4278 if names.is_empty() {
4279 raw.to_string()
4280 } else {
4281 format!("{raw} ({})", names.join(" | "))
4282 }
4283}
4284
4285fn systemd_field_display_value(
4286 context: &DisplayContext,
4287 scope: DisplayScope,
4288 field: &str,
4289 value: &[u8],
4290 resolve_user_group_names: bool,
4291) -> Value {
4292 let raw = String::from_utf8_lossy(value);
4293 match field {
4294 "PRIORITY" => Value::String(priority_name(&raw).unwrap_or(&raw).to_string()),
4295 "SYSLOG_FACILITY" => Value::String(syslog_facility_name(&raw).unwrap_or(&raw).to_string()),
4296 "ERRNO" => Value::String(errno_name(&raw).unwrap_or_else(|| raw.to_string())),
4297 "MESSAGE_ID" => Value::String(match (message_id_name(&raw), scope) {
4298 (Some(name), DisplayScope::Data) => format!("{raw} ({name})"),
4299 (Some(name), DisplayScope::Facet | DisplayScope::Histogram) => name.to_string(),
4300 (None, _) => raw.into_owned(),
4301 }),
4302 "_BOOT_ID" => Value::String(match (context.boot_first_realtime.get(value), scope) {
4303 (Some(timestamp), DisplayScope::Data) => format!(
4304 "{} ({}) ",
4305 raw,
4306 format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4307 ),
4308 (Some(timestamp), DisplayScope::Facet | DisplayScope::Histogram) => {
4309 format_realtime_usec(*timestamp, TimestampPrecision::Seconds)
4310 }
4311 (None, _) => raw.into_owned(),
4312 }),
4313 "_UID"
4314 | "_SYSTEMD_OWNER_UID"
4315 | "OBJECT_SYSTEMD_OWNER_UID"
4316 | "OBJECT_UID"
4317 | "_AUDIT_LOGINUID"
4318 | "OBJECT_AUDIT_LOGINUID" => {
4319 if resolve_user_group_names {
4320 Value::String(cached_uid_display(context, raw.as_ref()))
4321 } else {
4322 Value::String(raw.into_owned())
4323 }
4324 }
4325 "_GID" | "OBJECT_GID" => {
4326 if resolve_user_group_names {
4327 Value::String(cached_gid_display(context, raw.as_ref()))
4328 } else {
4329 Value::String(raw.into_owned())
4330 }
4331 }
4332 "_CAP_EFFECTIVE" => Value::String(cap_effective_display(&raw)),
4333 "_SOURCE_REALTIME_TIMESTAMP" => Value::String(match raw.parse::<u64>() {
4334 Ok(timestamp) if timestamp != 0 => {
4335 format!(
4336 "{} ({})",
4337 raw,
4338 format_realtime_usec(timestamp, TimestampPrecision::Micros)
4339 )
4340 }
4341 _ => raw.into_owned(),
4342 }),
4343 _ => Value::String(raw.into_owned()),
4344 }
4345}
4346
4347fn cached_uid_display(context: &DisplayContext, raw: &str) -> String {
4348 if let Some(value) = context.uid_display_cache.borrow().get(raw) {
4349 return value.clone();
4350 }
4351 let value = resolve_uid_name(raw).unwrap_or_else(|| raw.to_string());
4352 context
4353 .uid_display_cache
4354 .borrow_mut()
4355 .insert(raw.to_string(), value.clone());
4356 value
4357}
4358
4359fn cached_gid_display(context: &DisplayContext, raw: &str) -> String {
4360 if let Some(value) = context.gid_display_cache.borrow().get(raw) {
4361 return value.clone();
4362 }
4363 let value = resolve_gid_name(raw).unwrap_or_else(|| raw.to_string());
4364 context
4365 .gid_display_cache
4366 .borrow_mut()
4367 .insert(raw.to_string(), value.clone());
4368 value
4369}
4370
4371#[cfg(unix)]
4372fn resolve_uid_name(raw: &str) -> Option<String> {
4373 let uid = raw.parse::<libc::uid_t>().ok()?;
4374 let mut pwd = std::mem::MaybeUninit::<libc::passwd>::uninit();
4375 let mut result = std::ptr::null_mut();
4376 let mut buffer = vec![0i8; 16_384];
4377 let rc = unsafe {
4378 libc::getpwuid_r(
4379 uid,
4380 pwd.as_mut_ptr(),
4381 buffer.as_mut_ptr(),
4382 buffer.len(),
4383 &mut result,
4384 )
4385 };
4386 if rc != 0 || result.is_null() {
4387 return None;
4388 }
4389 let pwd = unsafe { pwd.assume_init() };
4390 Some(
4391 unsafe { CStr::from_ptr(pwd.pw_name) }
4392 .to_string_lossy()
4393 .into_owned(),
4394 )
4395}
4396
4397#[cfg(not(unix))]
4398fn resolve_uid_name(_raw: &str) -> Option<String> {
4399 None
4400}
4401
4402#[cfg(unix)]
4403fn resolve_gid_name(raw: &str) -> Option<String> {
4404 let gid = raw.parse::<libc::gid_t>().ok()?;
4405 let mut grp = std::mem::MaybeUninit::<libc::group>::uninit();
4406 let mut result = std::ptr::null_mut();
4407 let mut buffer = vec![0i8; 16_384];
4408 let rc = unsafe {
4409 libc::getgrgid_r(
4410 gid,
4411 grp.as_mut_ptr(),
4412 buffer.as_mut_ptr(),
4413 buffer.len(),
4414 &mut result,
4415 )
4416 };
4417 if rc != 0 || result.is_null() {
4418 return None;
4419 }
4420 let grp = unsafe { grp.assume_init() };
4421 Some(
4422 unsafe { CStr::from_ptr(grp.gr_name) }
4423 .to_string_lossy()
4424 .into_owned(),
4425 )
4426}
4427
4428#[cfg(not(unix))]
4429fn resolve_gid_name(_raw: &str) -> Option<String> {
4430 None
4431}
4432
4433const MESSAGE_ID_NAMES: &[(&str, &str)] = &[
4434 ("f77379a8490b408bbe5f6940505a777b", "Journal started"),
4435 ("d93fb3c9c24d451a97cea615ce59c00b", "Journal stopped"),
4436 (
4437 "a596d6fe7bfa4994828e72309e95d61e",
4438 "Journal messages suppressed",
4439 ),
4440 (
4441 "e9bf28e6e834481bb6f48f548ad13606",
4442 "Journal messages missed",
4443 ),
4444 (
4445 "ec387f577b844b8fa948f33cad9a75e6",
4446 "Journal disk space usage",
4447 ),
4448 ("fc2e22bc6ee647b6b90729ab34a250b1", "Coredump"),
4449 ("5aadd8e954dc4b1a8c954d63fd9e1137", "Coredump truncated"),
4450 ("1f4e0a44a88649939aaea34fc6da8c95", "Backtrace"),
4451 ("8d45620c1a4348dbb17410da57c60c66", "User Session created"),
4452 (
4453 "3354939424b4456d9802ca8333ed424a",
4454 "User Session terminated",
4455 ),
4456 ("fcbefc5da23d428093f97c82a9290f7b", "Seat started"),
4457 ("e7852bfe46784ed0accde04bc864c2d5", "Seat removed"),
4458 (
4459 "24d8d4452573402496068381a6312df2",
4460 "VM or container started",
4461 ),
4462 (
4463 "58432bd3bace477cb514b56381b8a758",
4464 "VM or container stopped",
4465 ),
4466 ("c7a787079b354eaaa9e77b371893cd27", "Time change"),
4467 ("45f82f4aef7a4bbf942ce861d1f20990", "Timezone change"),
4468 (
4469 "50876a9db00f4c40bde1a2ad381c3a1b",
4470 "System configuration issues",
4471 ),
4472 (
4473 "b07a249cd024414a82dd00cd181378ff",
4474 "System start-up completed",
4475 ),
4476 (
4477 "eed00a68ffd84e31882105fd973abdd1",
4478 "User start-up completed",
4479 ),
4480 ("6bbd95ee977941e497c48be27c254128", "Sleep start"),
4481 ("8811e6df2a8e40f58a94cea26f8ebf14", "Sleep stop"),
4482 (
4483 "98268866d1d54a499c4e98921d93bc40",
4484 "System shutdown initiated",
4485 ),
4486 (
4487 "c14aaf76ec284a5fa1f105f88dfb061c",
4488 "System factory reset initiated",
4489 ),
4490 ("d9ec5e95e4b646aaaea2fd05214edbda", "Container init crashed"),
4491 (
4492 "3ed0163e868a4417ab8b9e210407a96c",
4493 "System reboot failed after crash",
4494 ),
4495 ("645c735537634ae0a32b15a7c6cba7d4", "Init execution froze"),
4496 (
4497 "5addb3a06a734d3396b794bf98fb2d01",
4498 "Init crashed no coredump",
4499 ),
4500 ("5c9e98de4ab94c6a9d04d0ad793bd903", "Init crashed no fork"),
4501 (
4502 "5e6f1f5e4db64a0eaee3368249d20b94",
4503 "Init crashed unknown signal",
4504 ),
4505 (
4506 "83f84b35ee264f74a3896a9717af34cb",
4507 "Init crashed systemd signal",
4508 ),
4509 (
4510 "3a73a98baf5b4b199929e3226c0be783",
4511 "Init crashed process signal",
4512 ),
4513 (
4514 "2ed18d4f78ca47f0a9bc25271c26adb4",
4515 "Init crashed waitpid failed",
4516 ),
4517 (
4518 "56b1cd96f24246c5b607666fda952356",
4519 "Init crashed coredump failed",
4520 ),
4521 ("4ac7566d4d7548f4981f629a28f0f829", "Init crashed coredump"),
4522 (
4523 "38e8b1e039ad469291b18b44c553a5b7",
4524 "Crash shell failed to fork",
4525 ),
4526 (
4527 "872729b47dbe473eb768ccecd477beda",
4528 "Crash shell failed to execute",
4529 ),
4530 ("658a67adc1c940b3b3316e7e8628834a", "Selinux failed"),
4531 ("e6f456bd92004d9580160b2207555186", "Battery low warning"),
4532 (
4533 "267437d33fdd41099ad76221cc24a335",
4534 "Battery low powering off",
4535 ),
4536 (
4537 "79e05b67bc4545d1922fe47107ee60c5",
4538 "Manager mainloop failed",
4539 ),
4540 ("dbb136b10ef4457ba47a795d62f108c9", "Manager no xdgdir path"),
4541 (
4542 "ed158c2df8884fa584eead2d902c1032",
4543 "Init failed to drop capability bounding set of usermode",
4544 ),
4545 (
4546 "42695b500df048298bee37159caa9f2e",
4547 "Init failed to drop capability bounding set",
4548 ),
4549 (
4550 "bfc2430724ab44499735b4f94cca9295",
4551 "User manager can't disable new privileges",
4552 ),
4553 (
4554 "59288af523be43a28d494e41e26e4510",
4555 "Manager failed to start default target",
4556 ),
4557 (
4558 "689b4fcc97b4486ea5da92db69c9e314",
4559 "Manager failed to isolate default target",
4560 ),
4561 (
4562 "5ed836f1766f4a8a9fc5da45aae23b29",
4563 "Manager failed to collect passed file descriptors",
4564 ),
4565 (
4566 "6a40fbfbd2ba4b8db02fb40c9cd090d7",
4567 "Init failed to fix up environment variables",
4568 ),
4569 (
4570 "0e54470984ac419689743d957a119e2e",
4571 "Manager failed to allocate",
4572 ),
4573 (
4574 "d67fa9f847aa4b048a2ae33535331adb",
4575 "Manager failed to write Smack",
4576 ),
4577 (
4578 "af55a6f75b544431b72649f36ff6d62c",
4579 "System shutdown critical error",
4580 ),
4581 (
4582 "d18e0339efb24a068d9c1060221048c2",
4583 "Init failed to fork off valgrind",
4584 ),
4585 ("7d4958e842da4a758f6c1cdc7b36dcc5", "Unit starting"),
4586 ("39f53479d3a045ac8e11786248231fbf", "Unit started"),
4587 ("be02cf6855d2428ba40df7e9d022f03d", "Unit failed"),
4588 ("de5b426a63be47a7b6ac3eaac82e2f6f", "Unit stopping"),
4589 ("9d1aaa27d60140bd96365438aad20286", "Unit stopped"),
4590 ("d34d037fff1847e6ae669a370e694725", "Unit reloading"),
4591 ("7b05ebc668384222baa8881179cfda54", "Unit reloaded"),
4592 ("5eb03494b6584870a536b337290809b3", "Unit restart scheduled"),
4593 ("ae8f7b866b0347b9af31fe1c80b127c0", "Unit resources"),
4594 ("7ad2d189f7e94e70a38c781354912448", "Unit success"),
4595 ("0e4284a0caca4bfc81c0bb6786972673", "Unit skipped"),
4596 ("d9b373ed55a64feb8242e02dbe79a49c", "Unit failure result"),
4597 (
4598 "641257651c1b4ec9a8624d7a40a9e1e7",
4599 "Process execution failed",
4600 ),
4601 ("98e322203f7a4ed290d09fe03c09fe15", "Unit process exited"),
4602 ("0027229ca0644181a76c4e92458afa2e", "Syslog forward missed"),
4603 (
4604 "1dee0369c7fc4736b7099b38ecb46ee7",
4605 "Mount point is not empty",
4606 ),
4607 ("d989611b15e44c9dbf31e3c81256e4ed", "Unit oomd kill"),
4608 ("fe6faa94e7774663a0da52717891d8ef", "Unit out of memory"),
4609 ("b72ea4a2881545a0b50e200e55b9b06f", "Lid opened"),
4610 ("b72ea4a2881545a0b50e200e55b9b070", "Lid closed"),
4611 ("f5f416b862074b28927a48c3ba7d51ff", "System docked"),
4612 ("51e171bd585248568110144c517cca53", "System undocked"),
4613 ("b72ea4a2881545a0b50e200e55b9b071", "Power key"),
4614 ("3e0117101eb243c1b9a50db3494ab10b", "Power key long press"),
4615 ("9fa9d2c012134ec385451ffe316f97d0", "Reboot key"),
4616 ("f1c59a58c9d943668965c337caec5975", "Reboot key long press"),
4617 ("b72ea4a2881545a0b50e200e55b9b072", "Suspend key"),
4618 ("bfdaf6d312ab4007bc1fe40a15df78e8", "Suspend key long press"),
4619 ("b72ea4a2881545a0b50e200e55b9b073", "Hibernate key"),
4620 (
4621 "167836df6f7f428e98147227b2dc8945",
4622 "Hibernate key long press",
4623 ),
4624 ("c772d24e9a884cbeb9ea12625c306c01", "Invalid configuration"),
4625 (
4626 "1675d7f172174098b1108bf8c7dc8f5d",
4627 "DNSSEC validation failed",
4628 ),
4629 (
4630 "4d4408cfd0d144859184d1e65d7c8a65",
4631 "DNSSEC trust anchor revoked",
4632 ),
4633 ("36db2dfa5a9045e1bd4af5f93e1cf057", "DNSSEC turned off"),
4634 ("b61fdac612e94b9182285b998843061f", "Username unsafe"),
4635 (
4636 "1b3bb94037f04bbf81028e135a12d293",
4637 "Mount point path not suitable",
4638 ),
4639 (
4640 "010190138f494e29a0ef6669749531aa",
4641 "Device path not suitable",
4642 ),
4643 ("b480325f9c394a7b802c231e51a2752c", "Nobody user unsuitable"),
4644 (
4645 "1c0454c1bd2241e0ac6fefb4bc631433",
4646 "Systemd udev settle deprecated",
4647 ),
4648 ("7c8a41f37b764941a0e1780b1be2f037", "Time initial sync"),
4649 ("7db73c8af0d94eeb822ae04323fe6ab6", "Time initial bump"),
4650 ("9e7066279dc8403da79ce4b1a69064b2", "Shutdown scheduled"),
4651 ("249f6fb9e6e2428c96f3f0875681ffa3", "Shutdown canceled"),
4652 ("3f7d5ef3e54f4302b4f0b143bb270cab", "TPM PCR Extended"),
4653 ("f9b0be465ad540d0850ad32172d57c21", "Memory Trimmed"),
4654 ("a8fa8dacdb1d443e9503b8be367a6adb", "SysV Service Found"),
4655 (
4656 "187c62eb1e7f463bb530394f52cb090f",
4657 "Portable Service attached",
4658 ),
4659 (
4660 "76c5c754d628490d8ecba4c9d042112b",
4661 "Portable Service detached",
4662 ),
4663 (
4664 "9cf56b8baf9546cf9478783a8de42113",
4665 "systemd-networkd sysctl changed by foreign process",
4666 ),
4667 (
4668 "ad7089f928ac4f7ea00c07457d47ba8a",
4669 "SRK into TPM authorization failure",
4670 ),
4671 (
4672 "b2bcbaf5edf948e093ce50bbea0e81ec",
4673 "Secure Attention Key (SAK) was pressed",
4674 ),
4675 ("7fc63312330b479bb32e598d47cef1a8", "dbus activate no unit"),
4676 (
4677 "ee9799dab1e24d81b7bee7759a543e1b",
4678 "dbus activate masked unit",
4679 ),
4680 ("a0fa58cafd6f4f0c8d003d16ccf9e797", "dbus broker exited"),
4681 ("c8c6cde1c488439aba371a664353d9d8", "dbus dirwatch"),
4682 ("8af3357071af4153af414daae07d38e7", "dbus dispatch stats"),
4683 ("199d4300277f495f84ba4028c984214c", "dbus no sopeergroup"),
4684 (
4685 "b209c0d9d1764ab38d13b8e00d1784d6",
4686 "dbus protocol violation",
4687 ),
4688 ("6fa70fa776044fa28be7a21daf42a108", "dbus receive failed"),
4689 (
4690 "0ce0fa61d1a9433dabd67417f6b8e535",
4691 "dbus service failed open",
4692 ),
4693 ("24dc708d9e6a4226a3efe2033bb744de", "dbus service invalid"),
4694 ("f15d2347662d483ea9bcd8aa1a691d28", "dbus sighup"),
4695 (
4696 "0ce153587afa4095832d233c17a88001",
4697 "Gnome SM startup succeeded",
4698 ),
4699 (
4700 "10dd2dc188b54a5e98970f56499d1f73",
4701 "Gnome SM unrecoverable failure",
4702 ),
4703 ("f3ea493c22934e26811cd62abe8e203a", "Gnome shell started"),
4704 ("c7b39b1e006b464599465e105b361485", "Flatpak cache"),
4705 ("75ba3deb0af041a9a46272ff85d9e73e", "Flathub pulls"),
4706 ("f02bce89a54e4efab3a94a797d26204a", "Flathub pull errors"),
4707 ("dd11929c788e48bdbb6276fb5f26b08a", "Boltd starting"),
4708 ("1e6061a9fbd44501b3ccc368119f2b69", "Netdata startup"),
4709 (
4710 "ed4cdb8f1beb4ad3b57cb3cae2d162fa",
4711 "Netdata connection from child",
4712 ),
4713 (
4714 "6e2e3839067648968b646045dbf28d66",
4715 "Netdata connection to parent",
4716 ),
4717 (
4718 "9ce0cb58ab8b44df82c4bf1ad9ee22de",
4719 "Netdata alert transition",
4720 ),
4721 (
4722 "6db0018e83e34320ae2a659d78019fb7",
4723 "Netdata alert notification",
4724 ),
4725 ("23e93dfccbf64e11aac858b9410d8a82", "Netdata fatal message"),
4726 (
4727 "8ddaf5ba33a74078b609250db1e951f3",
4728 "Sensor state transition",
4729 ),
4730 (
4731 "ec87a56120d5431bace51e2fb8bba243",
4732 "Netdata log flood protection",
4733 ),
4734 (
4735 "acb33cb95778476baac702eb7e4e151d",
4736 "Netdata Cloud connection",
4737 ),
4738 (
4739 "d1f59606dd4d41e3b217a0cfcae8e632",
4740 "Netdata extreme cardinality",
4741 ),
4742 ("02f47d350af5449197bf7a95b605a468", "Netdata exit reason"),
4743 (
4744 "4fdf40816c124623a032b7fe73beacb8",
4745 "Netdata dynamic configuration",
4746 ),
4747];
4748
4749fn message_id_name(raw: &str) -> Option<&'static str> {
4750 MESSAGE_ID_NAMES
4751 .iter()
4752 .find_map(|(candidate, name)| (*candidate == raw).then_some(*name))
4753}
4754
4755#[cfg(test)]
4756mod tests {
4757 use super::*;
4758 use crate::ExplorerHistogramBucket;
4759 use journal_core::file::{JournalFile, JournalFileOptions, JournalWriter, MmapMut};
4760 use journal_core::repository::File as RepoFile;
4761 use std::cell::Cell;
4762 use std::collections::HashMap;
4763 use tempfile::TempDir;
4764
4765 #[derive(Default)]
4766 struct TestNetdataState {
4767 metadata: HashMap<PathBuf, NetdataJournalFileMetadata>,
4768 updates: Vec<(PathBuf, u64)>,
4769 }
4770
4771 impl NetdataFunctionState for TestNetdataState {
4772 fn file_metadata(&self, path: &Path) -> Option<NetdataJournalFileMetadata> {
4773 self.metadata.get(path).cloned()
4774 }
4775
4776 fn update_file_journal_vs_realtime_delta_usec(&mut self, path: &Path, delta_usec: u64) {
4777 self.updates.push((path.to_path_buf(), delta_usec));
4778 }
4779 }
4780
4781 fn test_uuid(seed: u8) -> uuid::Uuid {
4782 uuid::Uuid::from_bytes([seed; 16])
4783 }
4784
4785 fn write_netdata_test_journal(directory: &std::path::Path, count: usize) {
4786 write_named_netdata_test_journal(
4787 directory,
4788 "netdata-api-test.journal",
4789 count,
4790 1_700_000_000_000_000,
4791 );
4792 }
4793
4794 fn write_named_netdata_test_journal(
4795 directory: &std::path::Path,
4796 name: &str,
4797 count: usize,
4798 start_realtime_usec: u64,
4799 ) {
4800 write_stepped_netdata_test_journal(directory, name, count, start_realtime_usec, 1);
4801 }
4802
4803 fn write_stepped_netdata_test_journal(
4804 directory: &std::path::Path,
4805 name: &str,
4806 count: usize,
4807 start_realtime_usec: u64,
4808 step_realtime_usec: u64,
4809 ) {
4810 std::fs::create_dir_all(directory).expect("create journal dir");
4811 let path = directory.join(name);
4812 let repo_file = RepoFile::from_path(&path).expect("repo file");
4813 let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
4814 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
4815 let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
4816 for index in 0..count {
4817 let message = format!("MESSAGE=row-{index}");
4818 let service = if index % 2 == 0 {
4819 b"SERVICE=even".as_slice()
4820 } else {
4821 b"SERVICE=odd".as_slice()
4822 };
4823 let payloads: [&[u8]; 3] = [message.as_bytes(), service, b"PRIORITY=6"];
4824 let realtime = start_realtime_usec
4825 .saturating_add((index as u64).saturating_mul(step_realtime_usec));
4826 writer
4827 .add_entry(&mut file, &payloads, realtime, realtime)
4828 .expect("write entry");
4829 }
4830 file.sync().expect("sync journal");
4831 }
4832
4833 struct NetdataCollectedPages {
4834 messages: Vec<String>,
4835 timestamps: Vec<u64>,
4836 }
4837
4838 fn collect_netdata_pages(
4839 directory: &std::path::Path,
4840 direction: Direction,
4841 page_size: usize,
4842 ) -> NetdataCollectedPages {
4843 let mut out = NetdataCollectedPages {
4844 messages: Vec::new(),
4845 timestamps: Vec::new(),
4846 };
4847 let mut anchor = None;
4848 for page in 0..100 {
4849 let mut request = json!({
4850 "after": 1_700_000_000,
4851 "before": 1_800_000_000,
4852 "last": page_size,
4853 "direction": direction_name(direction),
4854 "data_only": true,
4855 });
4856 if let Some(anchor) = anchor {
4857 request["anchor"] = Value::from(anchor);
4858 }
4859 let response = run_netdata_contract_request(directory, request);
4860 assert_eq!(
4861 response["status"], 200,
4862 "page {page} response = {response:#}"
4863 );
4864 let messages = response_column_strings(&response, "MESSAGE");
4865 let timestamps = response_column_u64s(&response, "timestamp");
4866 assert_eq!(messages.len(), timestamps.len());
4867 if messages.is_empty() {
4868 break;
4869 }
4870 out.messages.extend(messages);
4871 out.timestamps.extend(timestamps.iter().copied());
4872 anchor = Some(match direction {
4873 Direction::Forward => timestamps[0],
4874 Direction::Backward => *timestamps.last().expect("page timestamp"),
4875 });
4876 if timestamps.len() < page_size {
4877 break;
4878 }
4879 }
4880 assert!(!out.messages.is_empty(), "pagination returned no rows");
4881 out
4882 }
4883
4884 fn run_netdata_contract_request(directory: &std::path::Path, request: Value) -> Value {
4885 NetdataJournalFunction::systemd_journal_plugin_compatible()
4886 .run_directory_request_json_with_options(
4887 directory,
4888 &request,
4889 NetdataFunctionRunOptions::from_timeout_seconds(0),
4890 )
4891 .expect("run function")
4892 }
4893
4894 fn direction_name(direction: Direction) -> &'static str {
4895 match direction {
4896 Direction::Forward => "forward",
4897 Direction::Backward => "backward",
4898 }
4899 }
4900
4901 fn response_column_strings(response: &Value, field: &str) -> Vec<String> {
4902 let index = response_column_index(response, field);
4903 response["data"]
4904 .as_array()
4905 .expect("data")
4906 .iter()
4907 .map(|row| {
4908 row.as_array().expect("row")[index]
4909 .as_str()
4910 .expect("string cell")
4911 .to_string()
4912 })
4913 .collect()
4914 }
4915
4916 fn response_column_u64s(response: &Value, field: &str) -> Vec<u64> {
4917 let index = response_column_index(response, field);
4918 response["data"]
4919 .as_array()
4920 .expect("data")
4921 .iter()
4922 .map(|row| {
4923 row.as_array().expect("row")[index]
4924 .as_u64()
4925 .expect("u64 cell")
4926 })
4927 .collect()
4928 }
4929
4930 fn response_column_index(response: &Value, field: &str) -> usize {
4931 response["columns"][field]["index"]
4932 .as_u64()
4933 .expect("column index") as usize
4934 }
4935
4936 fn response_facet_count(response: &Value, key: &str, field: &str, value: &str) -> u64 {
4937 for facet in response[key].as_array().expect("facets") {
4938 if facet["id"] != field {
4939 continue;
4940 }
4941 for option in facet["options"].as_array().expect("facet options") {
4942 if option["id"] == value {
4943 return option["count"].as_u64().expect("facet count");
4944 }
4945 }
4946 panic!("{key} facet {field} missing value {value}");
4947 }
4948 panic!("{key} missing facet {field}");
4949 }
4950
4951 fn response_histogram_total(response: &Value, key: &str, value: &str) -> u64 {
4952 let chart = &response[key]["chart"];
4953 let names = chart["view"]["dimensions"]["names"]
4954 .as_array()
4955 .expect("histogram names");
4956 let dimension_index = names
4957 .iter()
4958 .position(|name| name.as_str() == Some(value))
4959 .expect("histogram dimension");
4960 chart["result"]["data"]
4961 .as_array()
4962 .expect("histogram data")
4963 .iter()
4964 .map(|point| {
4965 point.as_array().expect("histogram point")[dimension_index + 1]
4966 .as_array()
4967 .expect("histogram cell")[0]
4968 .as_u64()
4969 .unwrap_or_default()
4970 })
4971 .sum()
4972 }
4973
4974 fn assert_unique_messages(messages: &[String]) {
4975 let mut seen = HashSet::new();
4976 for message in messages {
4977 assert!(
4978 seen.insert(message),
4979 "duplicate message {message} in {messages:?}"
4980 );
4981 }
4982 }
4983
4984 #[test]
4985 fn parses_netdata_selections_as_and_fields_or_values() {
4986 let request = json!({
4987 "after": 200_000_000,
4988 "before": 200_000_100,
4989 "direction": "forward",
4990 "last": 25,
4991 "facets": ["PRIORITY"],
4992 "selections": {
4993 "PRIORITY": ["warning", "error"],
4994 "_HOSTNAME": ["node-a"],
4995 "__logs_sources": ["all-local-system-logs"],
4996 }
4997 });
4998
4999 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5000 .expect("parse request");
5001 assert_eq!(parsed.after_realtime_usec, Some(200_000_000_000_000));
5002 assert_eq!(parsed.before_realtime_usec, Some(200_000_100_999_999));
5003 assert_eq!(parsed.direction, Direction::Forward);
5004 assert_eq!(parsed.limit, 25);
5005 assert_eq!(parsed.filters.len(), 2);
5006 assert_eq!(parsed.filters[0].field, b"PRIORITY");
5007 assert_eq!(parsed.filters[0].values, vec![b"4".to_vec(), b"3".to_vec()]);
5008 assert_eq!(parsed.filters[1].field, b"_HOSTNAME");
5009 assert_eq!(parsed.filters[1].values, vec![b"node-a".to_vec()]);
5010 }
5011
5012 #[test]
5013 fn netdata_last_one_keeps_echo_and_uses_effective_minimum_two() {
5014 let request = json!({
5015 "after": 200_000_000,
5016 "before": 200_000_100,
5017 "last": 1
5018 });
5019
5020 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5021 .expect("parse request");
5022 let query =
5023 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5024
5025 assert_eq!(parsed.echo.get("last").and_then(Value::as_u64), Some(1));
5026 assert_eq!(parsed.limit, 2);
5027 assert_eq!(query.limit, 2);
5028 }
5029
5030 #[test]
5031 fn netdata_facet_counts_use_native_sliced_filter_semantics() {
5032 let request = json!({
5033 "after": 200_000_000,
5034 "before": 200_000_100,
5035 "facets": ["PRIORITY"],
5036 "selections": {
5037 "PRIORITY": ["warning"]
5038 }
5039 });
5040
5041 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5042 .expect("parse request");
5043 let query =
5044 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5045
5046 assert!(!query.exclude_facet_field_filters);
5047 }
5048
5049 #[test]
5050 fn netdata_multi_filter_facet_counts_exclude_same_field_filter() {
5051 let request = json!({
5052 "after": 200_000_000,
5053 "before": 200_000_100,
5054 "facets": ["PRIORITY", "_BOOT_ID"],
5055 "selections": {
5056 "PRIORITY": ["warning"],
5057 "_BOOT_ID": ["738043aea7b3417cbc3e9941ad26f769"]
5058 }
5059 });
5060
5061 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5062 .expect("parse request");
5063 let query =
5064 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5065
5066 assert!(query.exclude_facet_field_filters);
5067 }
5068
5069 #[test]
5070 fn parses_netdata_fts_query_like_simple_pattern() {
5071 let (terms, positives, negatives) =
5072 parse_fts_query_patterns(r"error|warning|!debug|escaped\|pipe|\!literal| a*B");
5073
5074 assert_eq!(
5075 positives,
5076 vec![
5077 b"error".to_vec(),
5078 b"warning".to_vec(),
5079 b"escaped|pipe".to_vec(),
5080 b"!literal".to_vec(),
5081 b" a*B".to_vec(),
5082 ]
5083 );
5084 assert_eq!(negatives, vec![b"debug".to_vec()]);
5085 assert_eq!(terms.len(), 6);
5086 assert!(!terms[0].negative);
5087 assert!(terms[2].negative);
5088 assert_eq!(
5089 terms[5],
5090 ExplorerFtsPattern {
5091 parts: vec![b" a".to_vec(), b"B".to_vec()],
5092 negative: false,
5093 }
5094 );
5095
5096 let request = json!({
5097 "query": r"alpha|!debug|needle\|pipe",
5098 "facets": ["PRIORITY"],
5099 });
5100 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5101 .expect("parse request");
5102 let query =
5103 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5104 assert_eq!(
5105 query.fts_patterns,
5106 vec![b"alpha".to_vec(), b"needle|pipe".to_vec()]
5107 );
5108 assert_eq!(query.fts_negative_patterns, vec![b"debug".to_vec()]);
5109 assert_eq!(query.fts_terms.len(), 3);
5110 }
5111
5112 #[test]
5113 fn netdata_requests_never_enable_debug_row_traversal_column_collection() {
5114 let request = json!({
5115 "facets": ["PRIORITY", "_HOSTNAME"],
5116 "histogram": "PRIORITY",
5117 "last": 25
5118 });
5119
5120 let parsed = NetdataRequest::parse(&request, &NetdataFunctionConfig::systemd_journal())
5121 .expect("parse request");
5122 let query =
5123 parsed.to_explorer_query(1, None, NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC);
5124
5125 assert!(!query.debug_collect_column_fields_by_row_traversal);
5126 }
5127
5128 #[test]
5129 fn netdata_function_suppresses_absent_requested_facet_groups() {
5130 let dir = TempDir::new().expect("tempdir");
5131 write_netdata_test_journal(dir.path(), 10);
5132 let request = json!({
5133 "after": 1_700_000_000,
5134 "before": 1_700_000_010,
5135 "facets": ["SERVICE", "MISSING_FIELD"],
5136 "histogram": "SERVICE",
5137 "last": 5,
5138 "slice": true
5139 });
5140 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5141
5142 let response = function
5143 .run_directory_request_json_with_options(
5144 dir.path(),
5145 &request,
5146 NetdataFunctionRunOptions::from_timeout_seconds(0),
5147 )
5148 .expect("run function");
5149
5150 let columns = response["columns"].as_object().expect("columns");
5151 assert!(columns.contains_key("SERVICE"));
5152 assert!(!columns.contains_key("MISSING_FIELD"));
5153 let facets = response["facets"].as_array().expect("facets");
5154 assert_eq!(
5155 facets
5156 .iter()
5157 .filter_map(|facet| facet["id"].as_str())
5158 .collect::<Vec<_>>(),
5159 vec!["SERVICE"]
5160 );
5161 let accepted = response["accepted_params"]
5162 .as_array()
5163 .expect("accepted params");
5164 assert!(accepted.iter().any(|value| value == "SERVICE"));
5165 assert!(!accepted.iter().any(|value| value == "MISSING_FIELD"));
5166 let histograms = response["available_histograms"]
5167 .as_array()
5168 .expect("available histograms");
5169 assert!(histograms.iter().any(|value| value["id"] == "SERVICE"));
5170 assert!(
5171 !histograms
5172 .iter()
5173 .any(|value| value["id"] == "MISSING_FIELD")
5174 );
5175 }
5176
5177 #[test]
5178 fn netdata_function_reports_zero_count_existing_facets_for_empty_results() {
5179 let dir = TempDir::new().expect("tempdir");
5180 write_netdata_test_journal(dir.path(), 10);
5181 let request = json!({
5182 "after": 1_700_000_000,
5183 "before": 1_700_000_010,
5184 "facets": ["PRIORITY"],
5185 "histogram": "PRIORITY",
5186 "selections": {
5187 "SERVICE": ["missing"]
5188 },
5189 "last": 5,
5190 "slice": true
5191 });
5192 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5193
5194 let response = function
5195 .run_directory_request_json_with_options(
5196 dir.path(),
5197 &request,
5198 NetdataFunctionRunOptions::from_timeout_seconds(0),
5199 )
5200 .expect("run function");
5201
5202 let facets = response["facets"].as_array().expect("facets");
5203 assert_eq!(facets.len(), 1);
5204 assert_eq!(facets[0]["id"], "PRIORITY");
5205 let options = facets[0]["options"].as_array().expect("options");
5206 assert!(options.iter().any(|option| {
5207 option["id"] == "6" && option["name"] == "info" && option["count"] == 0
5208 }));
5209 assert_eq!(response["items"]["matched"], 0);
5210 }
5211
5212 #[test]
5213 fn netdata_function_api_reports_progress() {
5214 let dir = TempDir::new().expect("tempdir");
5215 write_netdata_test_journal(dir.path(), 9_000);
5216 let request = json!({
5217 "after": 1_700_000_000,
5218 "before": 1_700_000_010,
5219 "facets": ["SERVICE"],
5220 "histogram": "SERVICE",
5221 "last": 0
5222 });
5223 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5224 let mut reports = 0u64;
5225 let mut progress = |progress: NetdataFunctionProgress| {
5226 reports = reports.saturating_add(1);
5227 assert_eq!(progress.current_file, 1);
5228 assert_eq!(progress.total_files, 1);
5229 assert!(progress.stats.rows_examined <= 9_000);
5230 };
5231 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5232 options.progress_interval = Duration::ZERO;
5233 options.progress_callback = Some(&mut progress);
5234
5235 let response = function
5236 .run_directory_request_json_with_options(dir.path(), &request, options)
5237 .expect("run function");
5238
5239 assert_eq!(response["status"], 200);
5240 assert!(reports > 0);
5241 assert_eq!(response["last_modified"], 1_700_000_000_008_999u64);
5242 }
5243
5244 #[test]
5245 fn netdata_function_api_reports_file_end_progress_for_small_scans() {
5246 let dir = TempDir::new().expect("tempdir");
5247 write_netdata_test_journal(dir.path(), 10);
5248 let request = json!({
5249 "after": 1_700_000_000,
5250 "before": 1_700_000_010,
5251 "facets": ["SERVICE"],
5252 "histogram": "SERVICE",
5253 "last": 0
5254 });
5255 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5256 let mut reports = 0u64;
5257 let mut last_rows_examined = 0u64;
5258 let mut progress = |progress: NetdataFunctionProgress| {
5259 reports = reports.saturating_add(1);
5260 last_rows_examined = progress.stats.rows_examined;
5261 };
5262 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5263 options.progress_callback = Some(&mut progress);
5264
5265 let response = function
5266 .run_directory_request_json_with_options(dir.path(), &request, options)
5267 .expect("run function");
5268
5269 assert_eq!(response["status"], 200);
5270 assert_eq!(reports, 1);
5271 assert_eq!(last_rows_examined, 10);
5272 }
5273
5274 #[test]
5275 fn netdata_function_progress_counts_only_query_files() {
5276 let dir = TempDir::new().expect("tempdir");
5277 write_named_netdata_test_journal(
5278 dir.path(),
5279 "old-window.journal",
5280 10,
5281 1_600_000_000_000_000,
5282 );
5283 write_named_netdata_test_journal(
5284 dir.path(),
5285 "current-window.journal",
5286 10,
5287 1_700_000_000_000_000,
5288 );
5289 let request = json!({
5290 "after": 1_700_000_000,
5291 "before": 1_700_000_010,
5292 "facets": ["SERVICE"],
5293 "histogram": "SERVICE",
5294 "last": 0
5295 });
5296 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5297 let mut reports = Vec::new();
5298 let mut progress = |progress: NetdataFunctionProgress| {
5299 reports.push((progress.current_file, progress.total_files));
5300 };
5301 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5302 options.progress_callback = Some(&mut progress);
5303
5304 let response = function
5305 .run_directory_request_json_with_options(dir.path(), &request, options)
5306 .expect("run function");
5307
5308 assert_eq!(response["status"], 200);
5309 assert_eq!(response["_journal_files"]["matched"], 1);
5310 assert_eq!(reports, vec![(1, 1)]);
5311 }
5312
5313 #[test]
5314 fn netdata_function_api_reports_cancellation() {
5315 let dir = TempDir::new().expect("tempdir");
5316 write_netdata_test_journal(dir.path(), 9_000);
5317 let request = json!({
5318 "after": 1_700_000_000,
5319 "before": 1_700_000_010,
5320 "facets": ["SERVICE"],
5321 "histogram": "SERVICE",
5322 "last": 0
5323 });
5324 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5325 let is_cancelled = || true;
5326 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5327 options.cancellation_callback = Some(&is_cancelled);
5328
5329 let response = function
5330 .run_directory_request_json_with_options(dir.path(), &request, options)
5331 .expect("run function");
5332
5333 assert_eq!(response["status"], 499);
5334 assert_eq!(response["errorMessage"], "Request cancelled.");
5335 assert_eq!(
5336 response.as_object().expect("response object").len(),
5337 2,
5338 "plugin-compatible function errors only include status and errorMessage"
5339 );
5340 }
5341
5342 #[test]
5343 fn netdata_function_api_cancels_during_active_scan() {
5344 let dir = TempDir::new().expect("tempdir");
5345 write_netdata_test_journal(dir.path(), 9_000);
5346 let request = json!({
5347 "after": 1_700_000_000,
5348 "before": 1_700_000_010,
5349 "facets": ["SERVICE"],
5350 "histogram": "SERVICE",
5351 "last": 0
5352 });
5353 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5354 let should_cancel = Cell::new(false);
5355 let mut reports = 0u64;
5356 let mut progress = |progress: NetdataFunctionProgress| {
5357 reports = reports.saturating_add(1);
5358 if progress.stats.rows_examined > 0 {
5359 should_cancel.set(true);
5360 }
5361 };
5362 let is_cancelled = || should_cancel.get();
5363 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5364 options.progress_interval = Duration::ZERO;
5365 options.progress_callback = Some(&mut progress);
5366 options.cancellation_callback = Some(&is_cancelled);
5367
5368 let response = function
5369 .run_directory_request_json_with_options(dir.path(), &request, options)
5370 .expect("run function");
5371
5372 assert_eq!(response["status"], 499);
5373 assert_eq!(response["errorMessage"], "Request cancelled.");
5374 assert!(reports > 0);
5375 assert!(should_cancel.get());
5376 }
5377
5378 #[test]
5379 fn netdata_function_api_honors_cancellation_after_final_file_progress() {
5380 let dir = TempDir::new().expect("tempdir");
5381 write_netdata_test_journal(dir.path(), 10);
5382 let request = json!({
5383 "after": 1_700_000_000,
5384 "before": 1_700_000_010,
5385 "facets": ["SERVICE"],
5386 "histogram": "SERVICE",
5387 "last": 0
5388 });
5389 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5390 let should_cancel = Cell::new(false);
5391 let mut progress = |_progress: NetdataFunctionProgress| {
5392 should_cancel.set(true);
5393 };
5394 let is_cancelled = || should_cancel.get();
5395 let mut options = NetdataFunctionRunOptions::from_timeout_seconds(0);
5396 options.progress_callback = Some(&mut progress);
5397 options.cancellation_callback = Some(&is_cancelled);
5398
5399 let response = function
5400 .run_directory_request_json_with_options(dir.path(), &request, options)
5401 .expect("run function");
5402
5403 assert_eq!(response["status"], 499);
5404 assert_eq!(response["errorMessage"], "Request cancelled.");
5405 assert!(should_cancel.get());
5406 }
5407
5408 #[test]
5409 fn netdata_function_api_reports_timeout_as_partial_table() {
5410 let dir = TempDir::new().expect("tempdir");
5411 write_netdata_test_journal(dir.path(), 10);
5412 let request = json!({
5413 "after": 1_700_000_000,
5414 "before": 1_700_000_010,
5415 "facets": ["SERVICE"],
5416 "histogram": "SERVICE",
5417 "last": 0
5418 });
5419 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5420 let options = NetdataFunctionRunOptions {
5421 timeout: Some(Duration::ZERO),
5422 ..NetdataFunctionRunOptions::default()
5423 };
5424
5425 let response = function
5426 .run_directory_request_json_with_options(dir.path(), &request, options)
5427 .expect("run function");
5428
5429 assert_eq!(response["status"], 200);
5430 assert_eq!(response["partial"], true);
5431 assert_eq!(response["message"]["status"], "warning");
5432 assert_eq!(
5433 response["message"]["title"],
5434 "Query timed-out, incomplete data. "
5435 );
5436 }
5437
5438 #[test]
5439 fn netdata_function_api_reports_sampling_counters() {
5440 let dir = TempDir::new().expect("tempdir");
5441 write_stepped_netdata_test_journal(
5442 dir.path(),
5443 "netdata-api-test.journal",
5444 5_000,
5445 1_700_000_000_000_000,
5446 1_000,
5447 );
5448 let request = json!({
5449 "after": 1_700_000_000,
5450 "before": 1_700_000_005,
5451 "facets": ["SERVICE"],
5452 "histogram": "SERVICE",
5453 "last": 5,
5454 "sampling": 20
5455 });
5456 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5457
5458 let response = function
5459 .run_directory_request_json_with_options(
5460 dir.path(),
5461 &request,
5462 NetdataFunctionRunOptions::from_timeout_seconds(0),
5463 )
5464 .expect("run function");
5465
5466 assert_eq!(response["status"], 200);
5467 assert!(
5468 response["_sampling"]["sampled"]
5469 .as_u64()
5470 .unwrap_or_default()
5471 > 0
5472 );
5473 assert!(
5474 response["_sampling"]["unsampled"]
5475 .as_u64()
5476 .unwrap_or_default()
5477 > 0
5478 );
5479 assert!(
5480 response["_sampling"]["estimated"]
5481 .as_u64()
5482 .unwrap_or_default()
5483 > 0
5484 );
5485 assert_eq!(
5486 response["items"]["estimated"],
5487 response["_sampling"]["estimated"]
5488 );
5489 assert!(
5490 response["items"]["unsampled"].as_u64().unwrap_or_default()
5491 < response["_sampling"]["unsampled"]
5492 .as_u64()
5493 .unwrap_or_default()
5494 );
5495 assert_eq!(response["message"]["status"], "notice");
5496 }
5497
5498 #[test]
5499 fn netdata_function_api_disables_sampling_for_data_only() {
5500 let dir = TempDir::new().expect("tempdir");
5501 write_netdata_test_journal(dir.path(), 5_000);
5502 let request = json!({
5503 "after": 1_700_000_000,
5504 "before": 1_700_000_010,
5505 "data_only": true,
5506 "last": 5,
5507 "sampling": 20
5508 });
5509 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5510
5511 let response = function
5512 .run_directory_request_json_with_options(
5513 dir.path(),
5514 &request,
5515 NetdataFunctionRunOptions::from_timeout_seconds(0),
5516 )
5517 .expect("run function");
5518
5519 assert_eq!(response["status"], 200);
5520 assert!(response.get("_sampling").is_none());
5521 }
5522
5523 #[test]
5524 fn normalizes_missing_time_window_to_last_hour_like_plugin() {
5525 assert_eq!(
5526 normalize_time_window(1_000_000_000, None, None),
5527 (Some(999_996_400_000_000), Some(1_000_000_000_999_999))
5528 );
5529 }
5530
5531 #[test]
5532 fn normalizes_inverted_time_window_like_plugin() {
5533 assert_eq!(
5534 normalize_time_window(1_000_000_000, Some(200_000_100), Some(200_000_000)),
5535 (Some(200_000_000_000_000), Some(200_000_100_999_999))
5536 );
5537 }
5538
5539 #[test]
5540 fn normalizes_equal_time_window_like_plugin() {
5541 assert_eq!(
5542 normalize_time_window(1_000_000_000, Some(200_000_000), Some(200_000_000)),
5543 (Some(199_996_400_000_000), Some(200_000_000_999_999))
5544 );
5545 }
5546
5547 #[test]
5548 fn normalizes_relative_time_window_like_plugin() {
5549 assert_eq!(
5550 normalize_time_window(1_000_000_000, Some(100), Some(200)),
5551 (Some(999_999_701_000_000), Some(999_999_800_999_999))
5552 );
5553 }
5554
5555 #[test]
5556 fn normalizes_missing_after_with_supplied_before_like_plugin() {
5557 assert_eq!(
5558 normalize_time_window(1_000_000_000, None, Some(200_000_000)),
5559 (Some(199_999_401_000_000), Some(200_000_000_999_999))
5560 );
5561 }
5562
5563 #[test]
5564 fn systemd_profile_transforms_priority_and_facility_for_display() {
5565 let profile = SystemdJournalProfile;
5566 let context = DisplayContext::default();
5567 assert_eq!(
5568 profile.field_display_value(&context, DisplayScope::Data, "PRIORITY", b"7"),
5569 json!("debug")
5570 );
5571 assert_eq!(
5572 profile.field_display_value(&context, DisplayScope::Data, "SYSLOG_FACILITY", b"3"),
5573 json!("daemon")
5574 );
5575 assert_eq!(priority_to_row_severity(b"3"), "critical");
5576 assert_eq!(priority_to_row_severity(b"6"), "normal");
5577 }
5578
5579 #[test]
5580 fn dynamic_process_name_matches_plugin_fallback_order() {
5581 let mut fields = BTreeMap::new();
5582 fields.insert("SYSLOG_IDENTIFIER".to_string(), vec![b"syslog".to_vec()]);
5583 fields.insert("_COMM".to_string(), vec![b"comm".to_vec()]);
5584 fields.insert("_PID".to_string(), vec![b"42".to_vec()]);
5585 fields.insert("SYSLOG_PID".to_string(), vec![b"99".to_vec()]);
5586 assert_eq!(dynamic_process_name(&fields), "syslog[42]");
5587
5588 fields.insert("CONTAINER_NAME".to_string(), vec![b"container".to_vec()]);
5589 assert_eq!(dynamic_process_name(&fields), "container[42]");
5590
5591 fields.remove("CONTAINER_NAME");
5592 fields.remove("SYSLOG_IDENTIFIER");
5593 fields.remove("_PID");
5594 assert_eq!(dynamic_process_name(&fields), "comm");
5595
5596 fields.remove("_COMM");
5597 fields.insert("_EXE".to_string(), vec![b"/usr/bin/app".to_vec()]);
5598 assert_eq!(dynamic_process_name(&fields), "-");
5599 }
5600
5601 #[test]
5602 fn facet_values_are_truncated_and_collapsed_like_plugin() {
5603 let prefix = vec![b'a'; NETDATA_FACET_MAX_VALUE_LENGTH];
5604 let mut first = prefix.clone();
5605 first.extend_from_slice(b"-first");
5606 let mut second = prefix.clone();
5607 second.extend_from_slice(b"-second");
5608
5609 let mut values = BTreeMap::new();
5610 add_netdata_facet_count(&mut values, &first, 2);
5611 add_netdata_facet_count(&mut values, &second, 3);
5612
5613 assert_eq!(values.len(), 1);
5614 assert_eq!(values.get(&prefix), Some(&5));
5615 }
5616
5617 #[test]
5618 fn histogram_values_are_truncated_and_collapsed_like_plugin() {
5619 let prefix = vec![b'b'; NETDATA_FACET_MAX_VALUE_LENGTH];
5620 let mut first = prefix.clone();
5621 first.extend_from_slice(b"-first");
5622 let mut second = prefix.clone();
5623 second.extend_from_slice(b"-second");
5624
5625 let mut values = HashMap::new();
5626 values.insert(first, 2);
5627 values.insert(second, 3);
5628 let histogram = ExplorerHistogram {
5629 field: b"TEST_FIELD".to_vec(),
5630 buckets: vec![ExplorerHistogramBucket {
5631 start_realtime_usec: 1_000_000,
5632 end_realtime_usec: 2_000_000,
5633 values,
5634 }],
5635 };
5636
5637 let function = NetdataJournalFunction::systemd_journal();
5638 let rendered = function.build_histogram(&DisplayContext::default(), &histogram, None);
5639 let labels = rendered["chart"]["result"]["labels"]
5640 .as_array()
5641 .expect("labels");
5642 assert_eq!(labels.len(), 2);
5643 assert_eq!(labels[1], Value::String(String::from_utf8(prefix).unwrap()));
5644 assert_eq!(rendered["chart"]["result"]["data"][0][1][0], json!(5));
5645 }
5646
5647 #[test]
5648 fn histogram_chart_metadata_includes_empty_dimension_arrays() {
5649 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5650 let empty = function.build_histogram(
5651 &DisplayContext::default(),
5652 &ExplorerHistogram {
5653 field: b"TRAP_SEVERITY".to_vec(),
5654 buckets: vec![ExplorerHistogramBucket {
5655 start_realtime_usec: 1_700_000_000_000_000,
5656 end_realtime_usec: 1_700_000_005_000_000,
5657 values: HashMap::new(),
5658 }],
5659 },
5660 None,
5661 );
5662
5663 assert_eq!(empty["chart"]["view"]["dimensions"]["names"], json!([]));
5664 assert_eq!(empty["chart"]["view"]["dimensions"]["ids"], json!([]));
5665 assert_eq!(empty["chart"]["db"]["dimensions"]["names"], json!([]));
5666
5667 let mut values = HashMap::new();
5668 values.insert(b"warning".to_vec(), 7);
5669 let with_value = function.build_histogram(
5670 &DisplayContext::default(),
5671 &ExplorerHistogram {
5672 field: b"TRAP_SEVERITY".to_vec(),
5673 buckets: vec![ExplorerHistogramBucket {
5674 start_realtime_usec: 1_700_000_000_000_000,
5675 end_realtime_usec: 1_700_000_005_000_000,
5676 values,
5677 }],
5678 },
5679 None,
5680 );
5681
5682 assert_eq!(
5683 with_value["chart"]["view"]["dimensions"]["names"],
5684 json!(["warning"])
5685 );
5686 assert_eq!(
5687 with_value["chart"]["view"]["dimensions"]["sts"]["min"],
5688 json!([7])
5689 );
5690 }
5691
5692 #[test]
5693 fn duplicate_row_timestamps_match_plugin_direction_adjustment() {
5694 let mut backward = vec![
5695 test_located_row(100),
5696 test_located_row(100),
5697 test_located_row(100),
5698 test_located_row(90),
5699 ];
5700 make_row_timestamps_unique(&mut backward, Direction::Backward);
5701 assert_eq!(
5702 backward
5703 .iter()
5704 .map(|row| row.row.realtime_usec)
5705 .collect::<Vec<_>>(),
5706 vec![100, 99, 98, 90]
5707 );
5708
5709 let mut forward = vec![
5710 test_located_row(90),
5711 test_located_row(100),
5712 test_located_row(100),
5713 test_located_row(100),
5714 ];
5715 make_row_timestamps_unique(&mut forward, Direction::Forward);
5716 assert_eq!(
5717 forward
5718 .iter()
5719 .map(|row| row.row.realtime_usec)
5720 .collect::<Vec<_>>(),
5721 vec![90, 100, 101, 102]
5722 );
5723 }
5724
5725 #[test]
5726 fn page_window_counts_forward_anchor_like_netdata_facets() {
5727 let config = NetdataFunctionConfig::systemd_journal();
5728 let request = NetdataRequest::parse(
5729 &json!({
5730 "after": 1_700_000_000,
5731 "before": 1_700_000_010,
5732 "anchor": 1_700_000_005_000_000u64,
5733 "direction": "forward",
5734 "last": 2
5735 }),
5736 &config,
5737 )
5738 .expect("parse request");
5739 let mut window = NetdataPageWindow::for_request(&request);
5740
5741 for realtime_usec in [
5742 1_700_000_003_000_000,
5743 1_700_000_004_000_000,
5744 1_700_000_006_000_000,
5745 1_700_000_007_000_000,
5746 1_700_000_008_000_000,
5747 ] {
5748 window.observe(realtime_usec);
5749 }
5750
5751 let counters = window.counters();
5752 assert_eq!(counters.matched, 3);
5753 assert_eq!(counters.before, 1);
5754 assert_eq!(counters.after, 2);
5755 }
5756
5757 #[test]
5758 fn page_window_counts_backward_anchor_like_netdata_facets() {
5759 let config = NetdataFunctionConfig::systemd_journal();
5760 let request = NetdataRequest::parse(
5761 &json!({
5762 "after": 1_700_000_000,
5763 "before": 1_700_000_010,
5764 "anchor": 1_700_000_008_000_000u64,
5765 "direction": "backward",
5766 "last": 2
5767 }),
5768 &config,
5769 )
5770 .expect("parse request");
5771 let mut window = NetdataPageWindow::for_request(&request);
5772
5773 for realtime_usec in [
5774 1_700_000_009_000_000,
5775 1_700_000_007_000_000,
5776 1_700_000_006_000_000,
5777 1_700_000_005_000_000,
5778 ] {
5779 window.observe(realtime_usec);
5780 }
5781
5782 let counters = window.counters();
5783 assert_eq!(counters.matched, 3);
5784 assert_eq!(counters.before, 1);
5785 assert_eq!(counters.after, 1);
5786 }
5787
5788 #[test]
5789 fn page_window_counts_tail_anchor_like_netdata_facets() {
5790 let config = NetdataFunctionConfig::systemd_journal();
5791 let request = NetdataRequest::parse(
5792 &json!({
5793 "after": 1_700_000_000,
5794 "before": 1_700_000_010,
5795 "anchor": 1_700_000_008_000_000u64,
5796 "if_modified_since": 1_700_000_008_000_000u64,
5797 "data_only": true,
5798 "tail": true,
5799 "last": 2
5800 }),
5801 &config,
5802 )
5803 .expect("parse request");
5804 let mut window = NetdataPageWindow::for_request(&request);
5805
5806 for realtime_usec in [
5807 1_700_000_009_000_000,
5808 1_700_000_008_000_000,
5809 1_700_000_007_000_000,
5810 ] {
5811 window.observe(realtime_usec);
5812 }
5813
5814 let counters = window.counters();
5815 assert_eq!(counters.matched, 1);
5816 assert_eq!(counters.before, 0);
5817 assert_eq!(counters.after, 2);
5818 }
5819
5820 #[test]
5821 fn netdata_function_tail_anchor_with_no_new_filtered_rows_returns_304() {
5822 let dir = TempDir::new().expect("tempdir");
5823 let start_realtime_usec = 1_700_000_000_000_000;
5824 write_stepped_netdata_test_journal(
5825 dir.path(),
5826 "netdata-api-test.journal",
5827 2,
5828 start_realtime_usec,
5829 1_000_000,
5830 );
5831 let request = json!({
5832 "after": 1_700_000_000,
5833 "before": 1_700_000_002,
5834 "anchor": start_realtime_usec,
5835 "if_modified_since": start_realtime_usec,
5836 "data_only": true,
5837 "tail": true,
5838 "last": 5,
5839 "selections": {
5840 "SERVICE": ["even"]
5841 }
5842 });
5843 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
5844
5845 let response = function
5846 .run_directory_request_json_with_options(
5847 dir.path(),
5848 &request,
5849 NetdataFunctionRunOptions::from_timeout_seconds(0),
5850 )
5851 .expect("run function");
5852
5853 assert_eq!(response["status"], 304);
5854 assert_eq!(
5855 response["errorMessage"],
5856 "No new data since the previous call."
5857 );
5858 }
5859
5860 #[test]
5861 fn netdata_function_pages_with_anchor_without_duplicate_or_missing_rows() {
5862 let dir = TempDir::new().expect("tempdir");
5863 let start_realtime_usec = 1_700_000_000_000_000;
5864 write_stepped_netdata_test_journal(
5865 dir.path(),
5866 "paging-contract.journal",
5867 7,
5868 start_realtime_usec,
5869 1,
5870 );
5871
5872 let backward = collect_netdata_pages(dir.path(), Direction::Backward, 2);
5873 assert_eq!(
5874 backward.messages,
5875 vec![
5876 "row-6", "row-5", "row-4", "row-3", "row-2", "row-1", "row-0"
5877 ]
5878 );
5879 assert_unique_messages(&backward.messages);
5880 assert_eq!(
5881 backward.timestamps,
5882 vec![
5883 start_realtime_usec + 6,
5884 start_realtime_usec + 5,
5885 start_realtime_usec + 4,
5886 start_realtime_usec + 3,
5887 start_realtime_usec + 2,
5888 start_realtime_usec + 1,
5889 start_realtime_usec,
5890 ]
5891 );
5892
5893 let forward = collect_netdata_pages(dir.path(), Direction::Forward, 2);
5894 assert_eq!(
5895 forward.messages,
5896 vec![
5897 "row-1", "row-0", "row-3", "row-2", "row-5", "row-4", "row-6"
5898 ]
5899 );
5900 assert_unique_messages(&forward.messages);
5901 assert_eq!(
5902 forward.timestamps,
5903 vec![
5904 start_realtime_usec + 1,
5905 start_realtime_usec,
5906 start_realtime_usec + 3,
5907 start_realtime_usec + 2,
5908 start_realtime_usec + 5,
5909 start_realtime_usec + 4,
5910 start_realtime_usec + 6,
5911 ]
5912 );
5913 }
5914
5915 #[test]
5916 fn netdata_function_tail_polls_return_only_rows_after_anchor_then_304() {
5917 let dir = TempDir::new().expect("tempdir");
5918 let start_realtime_usec = 1_700_000_000_000_000;
5919 write_stepped_netdata_test_journal(
5920 dir.path(),
5921 "tail-contract.journal",
5922 5,
5923 start_realtime_usec,
5924 1,
5925 );
5926 let anchor = start_realtime_usec + 2;
5927
5928 for requested_direction in [Direction::Backward, Direction::Forward] {
5929 let response = run_netdata_contract_request(
5930 dir.path(),
5931 json!({
5932 "after": 1_700_000_000,
5933 "before": 1_700_000_010,
5934 "last": 5,
5935 "direction": direction_name(requested_direction),
5936 "data_only": true,
5937 "tail": true,
5938 "if_modified_since": anchor,
5939 "anchor": anchor,
5940 }),
5941 );
5942 assert_eq!(response["status"], 200);
5943 assert_eq!(response["_request"]["direction"], "backward");
5944 assert_eq!(
5945 response_column_strings(&response, "MESSAGE"),
5946 vec!["row-4", "row-3"]
5947 );
5948 assert_eq!(
5949 response_column_u64s(&response, "timestamp"),
5950 vec![start_realtime_usec + 4, start_realtime_usec + 3]
5951 );
5952 }
5953
5954 let no_change = run_netdata_contract_request(
5955 dir.path(),
5956 json!({
5957 "after": 1_700_000_000,
5958 "before": 1_700_000_010,
5959 "last": 5,
5960 "direction": "backward",
5961 "data_only": true,
5962 "tail": true,
5963 "if_modified_since": start_realtime_usec + 4,
5964 "anchor": start_realtime_usec + 4,
5965 }),
5966 );
5967 assert_eq!(no_change["status"], 304);
5968 assert_eq!(
5969 no_change["errorMessage"],
5970 "No new data since the previous call."
5971 );
5972 }
5973
5974 #[test]
5975 fn netdata_function_tail_delta_reports_exact_incremental_facets_and_histogram() {
5976 let dir = TempDir::new().expect("tempdir");
5977 let start_realtime_usec = 1_700_000_000_000_000;
5978 write_stepped_netdata_test_journal(
5979 dir.path(),
5980 "delta-contract.journal",
5981 5,
5982 start_realtime_usec,
5983 1,
5984 );
5985 let anchor = start_realtime_usec + 1;
5986
5987 let response = run_netdata_contract_request(
5988 dir.path(),
5989 json!({
5990 "after": 1_700_000_000,
5991 "before": 1_700_000_010,
5992 "last": 2,
5993 "direction": "backward",
5994 "data_only": true,
5995 "delta": true,
5996 "tail": true,
5997 "if_modified_since": anchor,
5998 "anchor": anchor,
5999 "facets": ["SERVICE"],
6000 "histogram": "SERVICE",
6001 }),
6002 );
6003
6004 assert_eq!(response["status"], 200);
6005 assert_eq!(
6006 response_column_strings(&response, "MESSAGE"),
6007 vec!["row-4", "row-3"]
6008 );
6009 assert_eq!(
6010 response_facet_count(&response, "facets_delta", "SERVICE", "even"),
6011 2
6012 );
6013 assert_eq!(
6014 response_facet_count(&response, "facets_delta", "SERVICE", "odd"),
6015 1
6016 );
6017 assert_eq!(
6018 response_histogram_total(&response, "histogram_delta", "even"),
6019 2
6020 );
6021 assert_eq!(
6022 response_histogram_total(&response, "histogram_delta", "odd"),
6023 1
6024 );
6025 let items = response["items_delta"].as_object().expect("items_delta");
6026 assert_eq!(items["matched"], 3);
6027 assert_eq!(items["returned"], 2);
6028 }
6029
6030 #[test]
6031 fn realtime_adjuster_preserves_forward_state_across_file_boundaries() {
6032 let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Forward);
6033
6034 assert_eq!(adjuster.adjust(10), 10);
6035 assert_eq!(adjuster.adjust(10), 11);
6036 assert_eq!(adjuster.adjust(10), 12);
6037 }
6038
6039 #[test]
6040 fn realtime_adjuster_preserves_backward_state_across_file_boundaries() {
6041 let mut adjuster = NetdataRealtimeAdjuster::new(Direction::Backward);
6042
6043 assert_eq!(adjuster.adjust(10), 10);
6044 assert_eq!(adjuster.adjust(10), 9);
6045 assert_eq!(adjuster.adjust(10), 8);
6046 }
6047
6048 #[test]
6049 fn systemd_profile_keeps_user_group_ids_raw_by_default() {
6050 let context = DisplayContext::default();
6051 let profile = SystemdJournalProfile;
6052 assert_eq!(
6053 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6054 json!("0")
6055 );
6056 assert_eq!(
6057 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6058 json!("0")
6059 );
6060 }
6061
6062 #[cfg(unix)]
6063 #[test]
6064 fn plugin_compatible_profile_resolves_user_group_ids_explicitly() {
6065 let context = DisplayContext::default();
6066 let profile = SystemdJournalPluginProfile;
6067 assert_eq!(
6068 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6069 json!("root")
6070 );
6071 assert_eq!(
6072 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6073 json!("root")
6074 );
6075 }
6076
6077 #[cfg(unix)]
6078 #[test]
6079 fn plugin_compatible_profile_caches_user_group_resolution() {
6080 let context = DisplayContext::default();
6081 let profile = SystemdJournalPluginProfile;
6082
6083 assert_eq!(
6084 profile.field_display_value(&context, DisplayScope::Facet, "_UID", b"0"),
6085 json!("root")
6086 );
6087 assert_eq!(
6088 profile.field_display_value(&context, DisplayScope::Data, "_UID", b"0"),
6089 json!("root")
6090 );
6091 assert_eq!(
6092 profile.field_display_value(&context, DisplayScope::Facet, "_GID", b"0"),
6093 json!("root")
6094 );
6095 assert_eq!(
6096 profile.field_display_value(&context, DisplayScope::Data, "_GID", b"0"),
6097 json!("root")
6098 );
6099
6100 assert_eq!(context.uid_display_cache.borrow().len(), 1);
6101 assert_eq!(context.gid_display_cache.borrow().len(), 1);
6102 }
6103
6104 #[test]
6105 fn file_overlap_uses_netdata_max_realtime_slack() {
6106 let file_first_seconds = 200_000_000u64;
6107 let file_last_seconds = 200_000_100u64;
6108 let slack_seconds = NETDATA_JOURNAL_VS_REALTIME_DELTA_MAX_USEC / 1_000_000;
6109 let header = crate::FileHeader {
6110 signature: *b"LPKSHHRH",
6111 compatible_flags: 0,
6112 incompatible_flags: 0,
6113 state: 0,
6114 header_size: 0,
6115 n_entries: 0,
6116 head_entry_realtime: file_first_seconds * 1_000_000,
6117 tail_entry_realtime: file_last_seconds * 1_000_000,
6118 head_entry_seqnum: 0,
6119 tail_entry_seqnum: 0,
6120 tail_entry_boot_id: [0; 16],
6121 seqnum_id: [0; 16],
6122 };
6123 let config = NetdataFunctionConfig::systemd_journal();
6124
6125 let inside_slack = NetdataRequest::parse(
6126 &json!({
6127 "after": file_last_seconds + slack_seconds - 1,
6128 "before": file_last_seconds + slack_seconds + 500
6129 }),
6130 &config,
6131 )
6132 .expect("parse request");
6133 assert!(file_may_overlap_request(header, &inside_slack));
6134
6135 let outside_slack = NetdataRequest::parse(
6136 &json!({
6137 "after": file_last_seconds + slack_seconds + 1,
6138 "before": file_last_seconds + slack_seconds + 500
6139 }),
6140 &config,
6141 )
6142 .expect("parse request");
6143 assert!(!file_may_overlap_request(header, &outside_slack));
6144 }
6145
6146 #[test]
6147 fn journal_file_order_matches_plugin_comparator_shape() {
6148 let older = JournalFileOrderInfo {
6149 msg_first_realtime_usec: 100,
6150 msg_last_realtime_usec: 200,
6151 file_last_modified_usec: 200,
6152 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6153 };
6154 let newer = JournalFileOrderInfo {
6155 msg_first_realtime_usec: 100,
6156 msg_last_realtime_usec: 300,
6157 file_last_modified_usec: 100,
6158 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6159 };
6160 assert_eq!(
6161 compare_journal_file_order(&newer, &older, Direction::Backward),
6162 Ordering::Less
6163 );
6164 assert_eq!(
6165 compare_journal_file_order(&newer, &older, Direction::Forward),
6166 Ordering::Greater
6167 );
6168
6169 let newer_mtime = JournalFileOrderInfo {
6170 msg_first_realtime_usec: 100,
6171 msg_last_realtime_usec: 200,
6172 file_last_modified_usec: 300,
6173 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6174 };
6175 assert_eq!(
6176 compare_journal_file_order(&newer_mtime, &older, Direction::Backward),
6177 Ordering::Less
6178 );
6179
6180 let newer_first = JournalFileOrderInfo {
6181 msg_first_realtime_usec: 150,
6182 msg_last_realtime_usec: 200,
6183 file_last_modified_usec: 200,
6184 journal_vs_realtime_delta_usec: NETDATA_JOURNAL_VS_REALTIME_DELTA_DEFAULT_USEC,
6185 };
6186 assert_eq!(
6187 compare_journal_file_order(&newer_first, &older, Direction::Backward),
6188 Ordering::Less
6189 );
6190 }
6191
6192 #[test]
6193 fn boot_first_realtime_keeps_earliest_timestamp_like_plugin() {
6194 let mut boot_first = BTreeMap::new();
6195 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 300);
6196 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 100);
6197 record_boot_first_realtime(&mut boot_first, b"boot-a".to_vec(), 200);
6198
6199 assert_eq!(boot_first.get(b"boot-a".as_slice()), Some(&100));
6200 }
6201
6202 #[test]
6203 fn merge_histogram_rejects_inconsistent_bucket_shape() {
6204 let mut target = Some(ExplorerHistogram {
6205 field: b"PRIORITY".to_vec(),
6206 buckets: vec![ExplorerHistogramBucket {
6207 start_realtime_usec: 1_000_000,
6208 end_realtime_usec: 2_000_000,
6209 values: HashMap::new(),
6210 }],
6211 });
6212 let source = ExplorerHistogram {
6213 field: b"PRIORITY".to_vec(),
6214 buckets: vec![ExplorerHistogramBucket {
6215 start_realtime_usec: 1_000_000,
6216 end_realtime_usec: 1_500_000,
6217 values: HashMap::new(),
6218 }],
6219 };
6220
6221 let err = merge_histogram(&mut target, source).expect_err("shape mismatch");
6222 assert!(matches!(
6223 err,
6224 SdkError::Unsupported("inconsistent Netdata histogram bucket shape")
6225 ));
6226 }
6227
6228 #[test]
6229 fn collect_journal_files_recurses_nested_directories() {
6230 let dir = TempDir::new().expect("tempdir");
6231 let nested = dir.path().join("machine").join("nested");
6232 write_named_netdata_test_journal(&nested, "system.journal", 1, 1_700_000_000_000_000);
6233
6234 let collection = collect_journal_files(dir.path()).expect("collect files");
6235
6236 assert_eq!(collection.files.len(), 1);
6237 assert_eq!(collection.skipped, 0);
6238 assert!(collection.errors.is_empty());
6239 assert_eq!(
6240 collection.files[0]
6241 .file_name()
6242 .and_then(|name| name.to_str()),
6243 Some("system.journal")
6244 );
6245 }
6246
6247 #[cfg(unix)]
6248 #[test]
6249 fn collect_journal_files_deduplicates_symlinked_directories() {
6250 let dir = TempDir::new().expect("tempdir");
6251 let real = dir.path().join("real");
6252 let link = dir.path().join("link");
6253 write_named_netdata_test_journal(&real, "system.journal", 1, 1_700_000_000_000_000);
6254 std::os::unix::fs::symlink(&real, &link).expect("symlink");
6255
6256 let collection = collect_journal_files(dir.path()).expect("collect files");
6257
6258 assert_eq!(collection.files.len(), 1);
6259 assert_eq!(collection.skipped, 0);
6260 assert!(collection.errors.is_empty());
6261 }
6262
6263 #[cfg(unix)]
6264 #[test]
6265 fn collect_journal_files_deduplicates_symlinked_files() {
6266 let dir = TempDir::new().expect("tempdir");
6267 write_named_netdata_test_journal(dir.path(), "system.journal", 1, 1_700_000_000_000_000);
6268 std::os::unix::fs::symlink(
6269 dir.path().join("system.journal"),
6270 dir.path().join("system-copy.journal"),
6271 )
6272 .expect("symlink");
6273
6274 let collection = collect_journal_files(dir.path()).expect("collect files");
6275
6276 assert_eq!(collection.files.len(), 1);
6277 assert_eq!(collection.skipped, 0);
6278 assert!(collection.errors.is_empty());
6279 }
6280
6281 #[cfg(unix)]
6282 #[test]
6283 fn collect_journal_files_reports_unreadable_subdirectories() {
6284 use std::os::unix::fs::PermissionsExt;
6285
6286 if unsafe { libc::geteuid() } == 0 {
6287 return;
6288 }
6289
6290 let dir = TempDir::new().expect("tempdir");
6291 std::fs::write(dir.path().join("visible.journal"), b"").expect("journal");
6292 let locked = dir.path().join("locked");
6293 std::fs::create_dir(&locked).expect("locked dir");
6294 std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o000))
6295 .expect("lock dir");
6296
6297 let collection = collect_journal_files(dir.path()).expect("collect files");
6298
6299 std::fs::set_permissions(&locked, std::fs::Permissions::from_mode(0o700))
6300 .expect("unlock dir");
6301 assert_eq!(collection.files.len(), 1);
6302 assert_eq!(collection.skipped, 1);
6303 assert_eq!(collection.errors.len(), 1);
6304 assert!(collection.errors[0].contains("locked"));
6305 }
6306
6307 #[test]
6308 fn source_summary_fills_missing_caller_metadata_from_header() {
6309 let dir = TempDir::new().expect("tempdir");
6310 write_named_netdata_test_journal(dir.path(), "system.journal", 2, 1_700_000_000_000_000);
6311 let path = dir.path().join("system.journal");
6312 let mut state = TestNetdataState::default();
6313 state.metadata.insert(
6314 path.clone(),
6315 NetdataJournalFileMetadata {
6316 msg_first_realtime_usec: Some(1_699_999_999_000_000),
6317 ..NetdataJournalFileMetadata::default()
6318 },
6319 );
6320 let options = NetdataFunctionRunOptions {
6321 state: Some(&mut state),
6322 ..NetdataFunctionRunOptions::default()
6323 };
6324
6325 let summary = JournalSourceSummary::from_paths(&[path], ReaderOptions::default(), &options);
6326
6327 assert_eq!(summary.first_realtime_usec, Some(1_699_999_999_000_000));
6328 assert_eq!(summary.last_realtime_usec, Some(1_700_000_000_000_001));
6329 }
6330
6331 #[test]
6332 fn source_selection_echoes_and_filters_known_groups() {
6333 let config = NetdataFunctionConfig::systemd_journal();
6334 let request = NetdataRequest::parse(
6335 &json!({
6336 "selections": {
6337 "__logs_sources": ["all-local-system-logs"]
6338 }
6339 }),
6340 &config,
6341 )
6342 .expect("parse source-filtered request");
6343
6344 assert_eq!(request.source_type, SOURCE_TYPE_LOCAL_SYSTEM);
6345 assert_eq!(
6346 request.echo.get("source_type").and_then(Value::as_u64),
6347 Some(SOURCE_TYPE_LOCAL_SYSTEM)
6348 );
6349 assert!(
6350 request
6351 .echo
6352 .pointer("/selections/__logs_sources/0")
6353 .is_some_and(Value::is_null)
6354 );
6355 assert!(request.matches_source(Path::new("/var/log/journal/machine/system.journal"), None));
6356 assert!(!request.matches_source(
6357 Path::new("/var/log/journal/machine/user-1000.journal"),
6358 None
6359 ));
6360 }
6361
6362 #[test]
6363 fn source_selection_uses_caller_metadata_before_filename_fallback() {
6364 let dir = TempDir::new().expect("tempdir");
6365 write_named_netdata_test_journal(dir.path(), "user-1000.journal", 1, 1_700_000_000_000_000);
6366 let path = dir.path().join("user-1000.journal");
6367 let config = NetdataFunctionConfig::systemd_journal();
6368 let request = NetdataRequest::parse(
6369 &json!({
6370 "after": 1_700_000_000,
6371 "before": 1_700_000_001,
6372 "selections": {
6373 "__logs_sources": ["all-local-system-logs"]
6374 }
6375 }),
6376 &config,
6377 )
6378 .expect("parse source-filtered request");
6379 assert!(!request.matches_source(&path, None));
6380
6381 let mut state = TestNetdataState::default();
6382 state.metadata.insert(
6383 path.clone(),
6384 NetdataJournalFileMetadata {
6385 source_type: Some(NETDATA_SOURCE_TYPE_LOCAL_SYSTEM),
6386 source_name: Some("system-registry".to_string()),
6387 ..NetdataJournalFileMetadata::default()
6388 },
6389 );
6390 let options = NetdataFunctionRunOptions {
6391 state: Some(&mut state),
6392 ..NetdataFunctionRunOptions::default()
6393 };
6394 let selected =
6395 select_journal_files_for_request(vec![path], &request, config.reader_options, &options);
6396
6397 assert_eq!(selected.files.len(), 1);
6398 }
6399
6400 #[test]
6401 fn netdata_function_state_receives_learned_source_realtime_delta() {
6402 let dir = TempDir::new().expect("tempdir");
6403 let commit_realtime_usec = 1_700_000_030_000_000;
6404 let source_realtime_usec = commit_realtime_usec - 30_000_000;
6405 write_source_realtime_delta_journal(
6406 dir.path(),
6407 "system.journal",
6408 commit_realtime_usec,
6409 source_realtime_usec,
6410 );
6411 let request = json!({
6412 "after": 1_700_000_029,
6413 "before": 1_700_000_031,
6414 "facets": ["SERVICE"],
6415 "histogram": "SERVICE",
6416 "last": 1,
6417 "sampling": 0
6418 });
6419 let function = NetdataJournalFunction::systemd_journal_plugin_compatible();
6420 let mut state = TestNetdataState::default();
6421 let options = NetdataFunctionRunOptions {
6422 state: Some(&mut state),
6423 ..NetdataFunctionRunOptions::from_timeout_seconds(0)
6424 };
6425
6426 let response = function
6427 .run_directory_request_json_with_options(dir.path(), &request, options)
6428 .expect("run function");
6429
6430 assert_eq!(response["status"], 200);
6431 assert_eq!(state.updates.len(), 1);
6432 assert_eq!(state.updates[0].1, 30_000_000);
6433 }
6434
6435 #[test]
6436 fn source_classification_matches_plugin_filename_shape() {
6437 assert_eq!(
6438 journal_file_source_type(Path::new("/var/log/journal/machine/system.journal")),
6439 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_SYSTEM
6440 );
6441 assert_eq!(
6442 journal_file_source_type(Path::new("/var/log/journal/machine/user-1000.journal")),
6443 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_USER
6444 );
6445 assert_eq!(
6446 journal_file_source_type(Path::new("/var/log/journal/machine/other.journal")),
6447 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_OTHER
6448 );
6449 assert_eq!(
6450 journal_file_source_type(Path::new(
6451 "/var/log/journal/machine.namespace/system.journal"
6452 )),
6453 SOURCE_TYPE_ALL | SOURCE_TYPE_LOCAL_ALL | SOURCE_TYPE_LOCAL_NAMESPACE
6454 );
6455 assert_eq!(
6456 journal_file_source_type(Path::new(
6457 "/var/log/journal/remote/remote-host-a@machine.journal"
6458 )),
6459 SOURCE_TYPE_ALL | SOURCE_TYPE_REMOTE_ALL
6460 );
6461 }
6462
6463 #[test]
6464 fn exact_source_names_follow_plugin_prefixes() {
6465 assert_eq!(
6466 journal_file_exact_source_name(Path::new(
6467 "/var/log/journal/machine.namespace/system.journal"
6468 ))
6469 .as_deref(),
6470 Some("namespace-namespace")
6471 );
6472 assert_eq!(
6473 journal_file_exact_source_name(Path::new(
6474 "/var/log/journal/remote/remote-host-a@machine.journal"
6475 ))
6476 .as_deref(),
6477 Some("remote-host-a")
6478 );
6479 assert_eq!(
6480 journal_file_exact_source_name(Path::new(
6481 "/var/log/journal/remote/remote-host-b.journal~.zst"
6482 ))
6483 .as_deref(),
6484 Some("remote-host-b")
6485 );
6486 }
6487
6488 #[test]
6489 fn disposed_journal_extension_matches_plugin_scan_contract() {
6490 assert!(is_journal_file_name(Path::new("active.journal")));
6491 assert!(is_journal_file_name(Path::new("archived.journal~")));
6492 assert!(is_journal_file_name(Path::new("active.journal.zst")));
6493 assert!(is_journal_file_name(Path::new("archived.journal~.zst")));
6494 }
6495
6496 fn test_located_row(realtime_usec: u64) -> LocatedRow {
6497 LocatedRow {
6498 file_path: PathBuf::from("test.journal"),
6499 row: ExplorerRow {
6500 realtime_usec,
6501 cursor: String::new(),
6502 payloads: Vec::new(),
6503 },
6504 }
6505 }
6506
6507 fn write_source_realtime_delta_journal(
6508 directory: &std::path::Path,
6509 name: &str,
6510 commit_realtime_usec: u64,
6511 source_realtime_usec: u64,
6512 ) {
6513 std::fs::create_dir_all(directory).expect("create journal dir");
6514 let path = directory.join(name);
6515 let repo_file = RepoFile::from_path(&path).expect("repo file");
6516 let options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
6517 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
6518 let mut writer = JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer");
6519 let source = format!("_SOURCE_REALTIME_TIMESTAMP={source_realtime_usec}");
6520 let payloads: [&[u8]; 4] = [
6521 b"MESSAGE=source lag test".as_slice(),
6522 b"SERVICE=delta".as_slice(),
6523 b"PRIORITY=6".as_slice(),
6524 source.as_bytes(),
6525 ];
6526 writer
6527 .add_entry(
6528 &mut file,
6529 &payloads,
6530 commit_realtime_usec,
6531 commit_realtime_usec,
6532 )
6533 .expect("write entry");
6534 file.sync().expect("sync journal");
6535 }
6536}