1use std::collections::VecDeque;
30use std::fmt::Write as _;
31use std::io::{self, Write};
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35use parking_lot::Mutex;
36
37use crate::cluster::peer::PeerState;
38use crate::conf::ConfPool;
39use crate::embed::ServerHandle;
40use crate::stats::PoolField;
41
42pub const MAX_RECENT_EVENTS: usize = 50;
50
51#[derive(Clone, Debug, Default, Eq, PartialEq)]
65pub struct Section {
66 name: String,
67 pairs: Vec<(String, String)>,
68}
69
70impl Section {
71 #[must_use]
83 pub fn with_pairs(name: &str, pairs: &[(&str, &str)]) -> Self {
84 Self {
85 name: name.to_string(),
86 pairs: pairs
87 .iter()
88 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
89 .collect(),
90 }
91 }
92
93 #[must_use]
95 pub fn name(&self) -> &str {
96 &self.name
97 }
98
99 #[must_use]
101 pub fn pairs(&self) -> &[(String, String)] {
102 &self.pairs
103 }
104
105 pub fn push<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
107 self.pairs.push((key.into(), value.into()));
108 }
109}
110
111#[derive(Clone, Debug, Default, Eq, PartialEq)]
114pub struct RowSection {
115 name: String,
116 rows: Vec<String>,
117}
118
119impl RowSection {
120 #[must_use]
122 pub fn new(name: &str) -> Self {
123 Self {
124 name: name.to_string(),
125 rows: Vec::new(),
126 }
127 }
128
129 #[must_use]
131 pub fn name(&self) -> &str {
132 &self.name
133 }
134
135 #[must_use]
137 pub fn rows(&self) -> &[String] {
138 &self.rows
139 }
140
141 pub fn push<S: Into<String>>(&mut self, row: S) {
143 self.rows.push(row.into());
144 }
145}
146
147#[derive(Clone, Debug, Eq, PartialEq)]
162pub struct RecentEvent {
163 pub ts_secs: u64,
165 pub kind: String,
169 pub detail: String,
171}
172
173impl RecentEvent {
174 #[must_use]
176 pub fn new<K: Into<String>, D: Into<String>>(ts_secs: u64, kind: K, detail: D) -> Self {
177 Self {
178 ts_secs,
179 kind: kind.into(),
180 detail: detail.into(),
181 }
182 }
183}
184
185#[derive(Clone, Debug, Default)]
202pub struct RecentEvents {
203 inner: Arc<Mutex<VecDeque<RecentEvent>>>,
204}
205
206impl RecentEvents {
207 #[must_use]
209 pub fn new() -> Self {
210 Self {
211 inner: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_RECENT_EVENTS))),
212 }
213 }
214
215 pub fn push(&self, event: RecentEvent) {
218 let mut guard = self.inner.lock();
219 if guard.len() == MAX_RECENT_EVENTS {
220 guard.pop_front();
221 }
222 guard.push_back(event);
223 }
224
225 #[must_use]
227 pub fn snapshot(&self) -> Vec<RecentEvent> {
228 self.inner.lock().iter().cloned().collect()
229 }
230
231 #[must_use]
233 pub fn len(&self) -> usize {
234 self.inner.lock().len()
235 }
236
237 #[must_use]
239 pub fn is_empty(&self) -> bool {
240 self.inner.lock().is_empty()
241 }
242}
243
244#[derive(Clone, Debug, Default, Eq, PartialEq)]
261pub struct ClusterInfoSnapshot {
262 pub build: Section,
264 pub config: Section,
266 pub ring: Section,
268 pub peers: RowSection,
270 pub queues: Section,
272 pub gossip: Section,
274 pub recent_events: RowSection,
276 pub memory: Section,
278 pub fds: Section,
280}
281
282impl ClusterInfoSnapshot {
283 #[must_use]
298 pub fn synthetic() -> Self {
299 let mut snap = Self {
300 build: Section::with_pairs(
301 "build",
302 &[
303 ("version", "0.0.1"),
304 ("git_sha", "unknown"),
305 ("build_profile", "debug"),
306 ],
307 ),
308 config: Section::with_pairs(
309 "config",
310 &[
311 ("data_store", "redis"),
312 ("listen", "0.0.0.0:8102"),
313 ("dyn_listen", "0.0.0.0:8101"),
314 ("rack", "rack-1"),
315 ("dc", "dc-1"),
316 ],
317 ),
318 ring: Section::with_pairs(
319 "ring",
320 &[
321 ("distribution", "vnode"),
322 ("vnodes", "1"),
323 ("tokens_per_node", "1"),
324 ],
325 ),
326 peers: RowSection::new("peers"),
327 queues: Section::with_pairs(
328 "queues",
329 &[
330 ("dispatcher_inflight", "0"),
331 ("backend_supervisor_pending", "0"),
332 ("hint_store_size", "0"),
333 ],
334 ),
335 gossip: Section::with_pairs(
336 "gossip",
337 &[
338 ("churn_total", "0"),
339 ("phi_alarms_total", "0"),
340 ("heartbeats_sent", "0"),
341 ("heartbeats_received", "0"),
342 ],
343 ),
344 recent_events: RowSection::new("recent_events"),
345 memory: Section::with_pairs("memory", &[("rss_bytes", "unavailable")]),
346 fds: Section::with_pairs("fds", &[("total", "unavailable")]),
347 };
348 snap.peers.push(
349 "peer_id=local dc=dc-1 rack=rack-1 status=Normal phi=0.00 last_seen_ms=0 tokens=1",
350 );
351 snap.recent_events
352 .push("1970-01-01T00:00:00Z restart local-bootstrap");
353 snap
354 }
355}
356
357pub fn format_text<W: Write>(snap: &ClusterInfoSnapshot, w: &mut W) -> io::Result<()> {
392 write_pair_section(w, &snap.build)?;
393 write_pair_section(w, &snap.config)?;
394 write_pair_section(w, &snap.ring)?;
395 write_row_section(w, &snap.peers)?;
396 write_pair_section(w, &snap.queues)?;
397 write_pair_section(w, &snap.gossip)?;
398 write_row_section(w, &snap.recent_events)?;
399 write_pair_section(w, &snap.memory)?;
400 write_pair_section(w, &snap.fds)?;
401 Ok(())
402}
403
404fn write_pair_section<W: Write>(w: &mut W, s: &Section) -> io::Result<()> {
405 writeln!(w, "=== {} ===", s.name())?;
406 for (k, v) in &s.pairs {
407 writeln!(w, "{k}={v}")?;
408 }
409 writeln!(w)
410}
411
412fn write_row_section<W: Write>(w: &mut W, s: &RowSection) -> io::Result<()> {
413 writeln!(w, "=== {} ===", s.name())?;
414 for r in &s.rows {
415 writeln!(w, "{r}")?;
416 }
417 writeln!(w)
418}
419
420#[must_use]
424pub fn build_version() -> &'static str {
425 env!("CARGO_PKG_VERSION")
426}
427
428#[must_use]
434pub fn build_git_sha() -> &'static str {
435 option_env!("DYNOMITE_GIT_SHA").unwrap_or("unknown")
436}
437
438#[must_use]
441pub fn build_profile() -> &'static str {
442 if cfg!(debug_assertions) {
443 "debug"
444 } else {
445 "release"
446 }
447}
448
449#[must_use]
451pub fn build_section() -> Section {
452 Section::with_pairs(
453 "build",
454 &[
455 ("version", build_version()),
456 ("git_sha", build_git_sha()),
457 ("build_profile", build_profile()),
458 ],
459 )
460}
461
462const SECRET_FIELDS: &[&str] = &["redis_requirepass"];
466
467#[must_use]
469pub fn is_secret_config_field(key: &str) -> bool {
470 SECRET_FIELDS.contains(&key)
471}
472
473#[must_use]
494pub fn config_section(pool: &ConfPool) -> Section {
495 let mut s = Section::with_pairs("config", &[]);
496 push_config_listeners(&mut s, pool);
497 push_config_topology(&mut s, pool);
498 push_config_security(&mut s, pool);
499 push_config_runtime(&mut s, pool);
500 s
501}
502
503fn push_config_listeners(s: &mut Section, pool: &ConfPool) {
504 let listen = pool
505 .listen
506 .as_ref()
507 .map_or_else(|| "unset".to_string(), endpoint_to_string);
508 let dyn_listen = pool
509 .dyn_listen
510 .as_ref()
511 .map_or_else(|| "unset".to_string(), endpoint_to_string);
512 let stats_listen = pool
513 .stats_listen
514 .as_ref()
515 .map_or_else(|| "unset".to_string(), endpoint_to_string);
516 let data_store = match pool.data_store {
517 Some(0) => "redis".to_string(),
518 Some(1) => "memcache".to_string(),
519 Some(2) => "noxu".to_string(),
520 Some(n) => format!("unknown({n})"),
521 None => "unset".to_string(),
522 };
523 s.push("data_store", data_store);
524 s.push("listen", listen);
525 s.push("dyn_listen", dyn_listen);
526 s.push("stats_listen", stats_listen);
527}
528
529fn push_config_topology(s: &mut Section, pool: &ConfPool) {
530 s.push("rack", pool.rack.clone().unwrap_or_else(|| "unset".into()));
531 s.push(
532 "dc",
533 pool.datacenter.clone().unwrap_or_else(|| "unset".into()),
534 );
535 s.push("env", pool.env.clone().unwrap_or_else(|| "unset".into()));
536 s.push(
537 "distribution",
538 pool.resolved_distribution().as_str().to_string(),
539 );
540 s.push(
541 "hash",
542 pool.hash
543 .map_or_else(|| "unset".to_string(), |h| h.as_str().to_string()),
544 );
545}
546
547fn push_config_security(s: &mut Section, pool: &ConfPool) {
548 s.push(
549 "secure_server_option",
550 pool.secure_server_option
551 .clone()
552 .unwrap_or_else(|| "unset".into()),
553 );
554 s.push(
555 "pem_key_file",
556 pool.pem_key_file.clone().unwrap_or_else(|| "unset".into()),
557 );
558 s.push(
559 "recon_key_file",
560 pool.recon_key_file
561 .clone()
562 .unwrap_or_else(|| "unset".into()),
563 );
564 s.push(
565 "recon_iv_file",
566 pool.recon_iv_file.clone().unwrap_or_else(|| "unset".into()),
567 );
568 s.push(
569 "redis_requirepass",
570 if pool.redis_requirepass.is_some() {
571 "redacted".to_string()
572 } else {
573 "unset".to_string()
574 },
575 );
576}
577
578fn push_config_runtime(s: &mut Section, pool: &ConfPool) {
579 s.push(
580 "timeout_ms",
581 pool.timeout
582 .map_or_else(|| "unset".to_string(), |n| n.to_string()),
583 );
584 s.push(
585 "preconnect",
586 pool.preconnect
587 .map_or_else(|| "unset".to_string(), |b| b.to_string()),
588 );
589 s.push(
590 "auto_eject_hosts",
591 pool.auto_eject_hosts
592 .map_or_else(|| "unset".to_string(), |b| b.to_string()),
593 );
594 s.push(
595 "enable_hinted_handoff",
596 pool.enable_hinted_handoff
597 .map_or_else(|| "unset".to_string(), |b| b.to_string()),
598 );
599 s.push(
600 "enable_gossip",
601 pool.enable_gossip
602 .map_or_else(|| "unset".to_string(), |b| b.to_string()),
603 );
604 s.push(
605 "read_consistency",
606 pool.read_consistency
607 .clone()
608 .unwrap_or_else(|| "unset".into()),
609 );
610 s.push(
611 "write_consistency",
612 pool.write_consistency
613 .clone()
614 .unwrap_or_else(|| "unset".into()),
615 );
616}
617
618fn endpoint_to_string(l: &crate::conf::ConfListen) -> String {
619 format!("{}:{}", l.name(), l.port())
620}
621
622#[must_use]
625pub fn ring_section(distribution: &str, ring_entries: usize, tokens_per_node: usize) -> Section {
626 let mut s = Section::with_pairs("ring", &[]);
627 s.push("distribution", distribution);
628 s.push("vnodes", ring_entries.to_string());
629 s.push("tokens_per_node", tokens_per_node.to_string());
630 s
631}
632
633#[must_use]
635pub fn peer_row(
636 peer_id: &str,
637 dc: &str,
638 rack: &str,
639 state: PeerState,
640 phi: f64,
641 last_seen_ms: i64,
642 tokens: usize,
643) -> String {
644 format!(
645 "peer_id={peer_id} dc={dc} rack={rack} status={status} phi={phi:.2} \
646 last_seen_ms={last_seen_ms} tokens={tokens}",
647 status = state.name(),
648 )
649}
650
651#[must_use]
653pub fn queues_section(snap: &crate::stats::Snapshot) -> Section {
654 let mut s = Section::with_pairs("queues", &[]);
655 let dispatcher = snap.pool.metrics[PoolField::DnodeClientInQueue.index()]
656 + snap.pool.metrics[PoolField::DnodeClientOutQueue.index()];
657 let backend = snap.server.metrics[crate::stats::ServerField::InQueue.index()]
658 + snap.server.metrics[crate::stats::ServerField::OutQueue.index()];
659 let hint_store = snap.pool.metrics[PoolField::PeerInQueue.index()]
660 + snap.pool.metrics[PoolField::RemotePeerInQueue.index()];
661 s.push("dispatcher_inflight", dispatcher.to_string());
662 s.push("backend_supervisor_pending", backend.to_string());
663 s.push("hint_store_size", hint_store.to_string());
664 s
665}
666
667#[must_use]
671pub fn gossip_section(snap: &crate::stats::Snapshot) -> Section {
672 let mut s = Section::with_pairs("gossip", &[]);
673 let churn = snap.pool.metrics[PoolField::StatsCount.index()];
674 let alarms: i64 = snap
678 .failure
679 .peer_phi
680 .iter()
681 .filter(|p| p.phi >= 8.0)
682 .count()
683 .try_into()
684 .unwrap_or(i64::MAX);
685 let transitions: i64 = snap
686 .failure
687 .peer_state_transitions
688 .iter()
689 .map(|t| i64::try_from(t.count).unwrap_or(i64::MAX))
690 .sum();
691 s.push("churn_total", transitions.to_string());
692 s.push("phi_alarms_total", alarms.to_string());
693 s.push("stats_count", churn.to_string());
694 s.push("heartbeats_sent", "unavailable".to_string());
695 s.push("heartbeats_received", "unavailable".to_string());
696 s
697}
698
699#[must_use]
702pub fn memory_section(dyn_memory: i64) -> Section {
703 let mut s = Section::with_pairs("memory", &[]);
704 let rss = read_proc_status_kb("VmRSS").map_or_else(
705 || "unavailable".to_string(),
706 |kb| (kb.saturating_mul(1024)).to_string(),
707 );
708 let vms = read_proc_status_kb("VmSize").map_or_else(
709 || "unavailable".to_string(),
710 |kb| (kb.saturating_mul(1024)).to_string(),
711 );
712 s.push("rss_bytes", rss);
713 s.push("vms_bytes", vms);
714 s.push("hint_store_bytes", "unavailable".to_string());
715 s.push("mbuf_pool_bytes", dyn_memory.to_string());
716 s
717}
718
719#[must_use]
722pub fn fds_section() -> Section {
723 let mut s = Section::with_pairs("fds", &[]);
724 let total = match std::fs::read_dir("/proc/self/fd") {
725 Ok(rd) => rd.count().to_string(),
726 Err(_) => "unavailable".to_string(),
727 };
728 s.push("total", total);
729 s.push("listening", "unavailable".to_string());
730 s.push("peer_links", "unavailable".to_string());
731 s
732}
733
734fn read_proc_status_kb(field: &str) -> Option<u64> {
738 let s = std::fs::read_to_string("/proc/self/status").ok()?;
739 for line in s.lines() {
740 let Some(rest) = line.strip_prefix(field) else {
741 continue;
742 };
743 let rest = rest.trim_start_matches(':').trim();
745 let mut it = rest.split_whitespace();
746 let n_str = it.next()?;
747 return n_str.parse::<u64>().ok();
748 }
749 None
750}
751
752#[must_use]
755pub fn render_timestamp(ts_secs: u64) -> String {
756 let mut out = String::with_capacity(20);
757 let (y, mo, d, h, mi, se) = secs_to_components(ts_secs);
758 let _ = write!(out, "{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z");
759 out
760}
761
762fn secs_to_components(ts_secs: u64) -> (u64, u64, u64, u64, u64, u64) {
767 let secs_per_day: u64 = 86_400;
768 let days = ts_secs / secs_per_day;
769 let day_rem = ts_secs % secs_per_day;
770 let hour = day_rem / 3_600;
771 let minute = (day_rem % 3_600) / 60;
772 let second = day_rem % 60;
773 let (year, month, day) = days_to_civil(days);
774 (year, month, day, hour, minute, second)
775}
776
777fn days_to_civil(days_since_epoch: u64) -> (u64, u64, u64) {
782 let serial = days_since_epoch + 719_468;
783 let era = serial / 146_097;
784 let day_of_era = serial - era * 146_097;
785 let year_of_era =
786 (day_of_era - day_of_era / 1_460 + day_of_era / 36_524 - day_of_era / 146_096) / 365;
787 let mut year = year_of_era + era * 400;
788 let day_of_year = day_of_era - (365 * year_of_era + year_of_era / 4 - year_of_era / 100);
789 let month_phase = (5 * day_of_year + 2) / 153;
790 let day = day_of_year - (153 * month_phase + 2) / 5 + 1;
791 let month = if month_phase < 10 {
792 month_phase + 3
793 } else {
794 month_phase.saturating_sub(9)
795 };
796 if month <= 2 {
797 year += 1;
798 }
799 (year, month, day)
800}
801
802#[must_use]
805pub fn recent_events_section(events: &[RecentEvent]) -> RowSection {
806 let mut sec = RowSection::new("recent_events");
807 for e in events {
808 let detail = if e.detail.is_empty() {
809 String::new()
810 } else {
811 format!(" {}", e.detail)
812 };
813 sec.push(format!(
814 "{} {}{}",
815 render_timestamp(e.ts_secs),
816 e.kind,
817 detail
818 ));
819 }
820 sec
821}
822
823#[must_use]
832pub fn gather_from_handle(
833 handle: &ServerHandle,
834 pool: &ConfPool,
835 events: &RecentEvents,
836) -> ClusterInfoSnapshot {
837 let stats = handle.stats();
838 let ring = handle.ring();
839 let peers_view = handle.peers();
840 let tokens_per_node = peers_view
841 .iter()
842 .find(|p| p.is_local)
843 .map_or(0, |p| p.tokens.len());
844 let dist = pool.resolved_distribution().as_str();
845 let mut peer_rows = RowSection::new("peers");
846 let _now_secs = SystemTime::now()
847 .duration_since(UNIX_EPOCH)
848 .map_or(0, |d| d.as_secs());
849 for p in &peers_view {
850 let last_seen_ms: i64 = 0;
855 let row = peer_row(
856 &peer_label(p),
857 &p.dc,
858 &p.rack,
859 p.state,
860 phi_for(&stats, p.idx),
861 last_seen_ms,
862 p.tokens.len(),
863 );
864 peer_rows.push(row);
865 }
866 ClusterInfoSnapshot {
867 build: build_section(),
868 config: config_section(pool),
869 ring: ring_section(dist, ring.entries.len(), tokens_per_node),
870 peers: peer_rows,
871 queues: queues_section(&stats),
872 gossip: gossip_section(&stats),
873 recent_events: recent_events_section(&events.snapshot()),
874 memory: memory_section(stats.dyn_memory),
875 fds: fds_section(),
876 }
877}
878
879fn peer_label(p: &crate::embed::PeerSnapshot) -> String {
880 if p.is_local {
881 "local".to_string()
882 } else {
883 format!("{}:{}", p.host, p.port)
884 }
885}
886
887fn phi_for(stats: &crate::stats::Snapshot, peer_idx: u32) -> f64 {
888 stats
889 .failure
890 .peer_phi
891 .iter()
892 .find(|p| p.peer_idx == peer_idx)
893 .map_or(0.0, |p| p.phi)
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899
900 #[test]
901 fn synthetic_snapshot_renders_all_sections() {
902 let snap = ClusterInfoSnapshot::synthetic();
903 let mut buf = Vec::new();
904 format_text(&snap, &mut buf).expect("format");
905 let text = String::from_utf8(buf).expect("ascii");
906 for header in [
907 "=== build ===",
908 "=== config ===",
909 "=== ring ===",
910 "=== peers ===",
911 "=== queues ===",
912 "=== gossip ===",
913 "=== recent_events ===",
914 "=== memory ===",
915 "=== fds ===",
916 ] {
917 assert!(text.contains(header), "missing {header}");
918 }
919 assert!(text.is_ascii());
920 }
921
922 #[test]
923 fn config_section_redacts_requirepass() {
924 let mut pool = ConfPool::default();
925 pool.apply_defaults();
926 pool.redis_requirepass = Some("super-secret-password".into());
927 let sec = config_section(&pool);
928 let pair = sec
929 .pairs()
930 .iter()
931 .find(|(k, _)| k == "redis_requirepass")
932 .expect("present");
933 assert_eq!(pair.1, "redacted");
934 let mut buf = Vec::new();
935 let snap = ClusterInfoSnapshot {
936 config: sec,
937 ..ClusterInfoSnapshot::synthetic()
938 };
939 format_text(&snap, &mut buf).unwrap();
940 let text = String::from_utf8(buf).unwrap();
941 assert!(!text.contains("super-secret-password"));
942 }
943
944 #[test]
945 fn ring_buffer_drops_oldest_when_full() {
946 let log = RecentEvents::new();
947 for i in 0..(MAX_RECENT_EVENTS + 5) {
948 log.push(RecentEvent::new(i as u64, "tick", ""));
949 }
950 let snap = log.snapshot();
951 assert_eq!(snap.len(), MAX_RECENT_EVENTS);
952 assert_eq!(snap.first().unwrap().ts_secs, 5);
953 assert_eq!(snap.last().unwrap().ts_secs, (MAX_RECENT_EVENTS + 4) as u64);
954 }
955
956 #[test]
957 fn timestamp_renders_known_epoch() {
958 assert_eq!(render_timestamp(0), "1970-01-01T00:00:00Z");
959 assert_eq!(render_timestamp(1_779_840_000), "2026-05-27T00:00:00Z");
961 }
962
963 #[test]
964 fn is_secret_reports_known_fields() {
965 assert!(is_secret_config_field("redis_requirepass"));
966 assert!(!is_secret_config_field("listen"));
967 }
968
969 #[test]
970 fn peer_row_is_ascii_and_well_formed() {
971 let row = peer_row(
972 "arnold",
973 "dc-arnold",
974 "rack-1",
975 PeerState::Normal,
976 0.42,
977 120,
978 3,
979 );
980 assert!(row.is_ascii());
981 assert!(row.contains("phi=0.42"));
982 assert!(row.contains("status=NORMAL"));
983 assert!(row.contains("tokens=3"));
984 }
985
986 #[test]
987 fn build_profile_is_one_of_two_values() {
988 let p = build_profile();
989 assert!(p == "debug" || p == "release");
990 }
991}