1use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9use std::fs;
10use std::path::{Path, PathBuf};
11
12pub const TOPOLOGY_BUDGET_SCHEMA_VERSION: &str = "1";
13
14const GIB: u64 = 1024 * 1024 * 1024;
15const DEFAULT_CACHE_BYTE_CAP_FALLBACK: usize = 64 * 1024 * 1024;
16const DEFAULT_CACHE_BYTE_CAP_MEMORY_FRACTION_DENOMINATOR: u64 = 128;
17const DEFAULT_CACHE_BYTE_CAP_CEILING: u64 = 2 * GIB;
18const TOPOLOGY_CACHE_BYTE_CAP_CEILING: u64 = 8 * GIB;
19const DEFAULT_MAX_INFLIGHT_FALLBACK: usize = 32 * 1024 * 1024;
20const TOPOLOGY_MAX_INFLIGHT_CEILING: u64 = 512 * 1024 * 1024;
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub struct TopologyBudgetPlan {
24 pub schema_version: String,
25 pub topology: TopologySnapshot,
26 pub reserved_core_policy: ReservedCorePolicy,
27 pub advisory_budgets: TopologyAdvisoryBudgets,
28 pub current_defaults: TopologyPlannerDefaults,
29 pub fallback_active: bool,
30 pub decision_reason: String,
31 pub proof_notes: Vec<String>,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct TopologySnapshot {
36 pub source: TopologySource,
37 pub topology_class: TopologyClass,
38 pub logical_cpus: usize,
39 pub physical_cores: usize,
40 pub sockets: usize,
41 pub numa_nodes: usize,
42 pub llc_groups: usize,
43 pub smt_threads_per_core: usize,
44 pub memory_total_bytes: Option<u64>,
45 pub memory_available_bytes: Option<u64>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum TopologySource {
51 LinuxSysfs,
52 Fallback,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56#[serde(rename_all = "snake_case")]
57pub enum TopologyClass {
58 Unknown,
59 SingleSocket,
60 SingleSocketSmt,
61 ManyCoreSingleSocket,
62 MultiSocketNuma,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66pub struct ReservedCorePolicy {
67 pub reserved_cores: usize,
68 pub policy: String,
69 pub reason: String,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub struct TopologyAdvisoryBudgets {
74 pub shard_builders: usize,
75 pub merge_workers: usize,
76 pub page_prep_workers: usize,
77 pub semantic_batchers: usize,
78 pub cache_cap_bytes: usize,
79 pub max_inflight_bytes: usize,
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub struct TopologyPlannerDefaults {
84 pub available_parallelism: usize,
85 pub reserved_cores: usize,
86 pub shard_builders: usize,
87 pub merge_workers: usize,
88 pub page_prep_workers: usize,
89 pub cache_cap_bytes: usize,
90 pub max_inflight_bytes: usize,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub struct MemorySnapshot {
95 pub total_bytes: Option<u64>,
96 pub available_bytes: Option<u64>,
97}
98
99impl TopologyPlannerDefaults {
100 pub fn conservative(
101 available_parallelism: usize,
102 reserved_cores: usize,
103 shard_builders: usize,
104 merge_workers: usize,
105 page_prep_workers: usize,
106 cache_cap_bytes: usize,
107 max_inflight_bytes: usize,
108 ) -> Self {
109 Self {
110 available_parallelism: available_parallelism.max(1),
111 reserved_cores: reserved_cores.min(available_parallelism.saturating_sub(1)),
112 shard_builders: shard_builders.max(1),
113 merge_workers: merge_workers.max(1),
114 page_prep_workers: page_prep_workers.max(1),
115 cache_cap_bytes: cache_cap_bytes.max(1),
116 max_inflight_bytes: max_inflight_bytes.max(1),
117 }
118 }
119
120 pub(crate) fn from_current_process() -> Self {
121 let pipeline = crate::indexer::lexical_rebuild_pipeline_settings_snapshot();
122 let memory = read_meminfo_snapshot(Path::new("/proc/meminfo")).unwrap_or(MemorySnapshot {
123 total_bytes: None,
124 available_bytes: None,
125 });
126 Self::conservative(
127 pipeline.available_parallelism,
128 pipeline.reserved_cores,
129 pipeline.staged_shard_builders,
130 pipeline.staged_merge_workers,
131 pipeline.page_prep_workers,
132 default_cache_cap_for_available(memory.available_bytes),
133 pipeline.pipeline_max_message_bytes_in_flight,
134 )
135 }
136}
137
138pub(crate) fn inspect_host_topology_budget() -> TopologyBudgetPlan {
139 let defaults = TopologyPlannerDefaults::from_current_process();
140 #[cfg(target_os = "linux")]
141 {
142 let memory = read_meminfo_snapshot(Path::new("/proc/meminfo")).unwrap_or(MemorySnapshot {
143 total_bytes: None,
144 available_bytes: None,
145 });
146 topology_budget_for_sysfs(Path::new("/sys"), memory, defaults)
147 }
148 #[cfg(not(target_os = "linux"))]
149 {
150 fallback_plan(
151 fallback_topology(None, defaults.available_parallelism),
152 defaults,
153 "linux sysfs topology is unavailable on this platform".to_string(),
154 )
155 }
156}
157
158pub fn topology_budget_for_sysfs(
159 sys_root: &Path,
160 memory: MemorySnapshot,
161 defaults: TopologyPlannerDefaults,
162) -> TopologyBudgetPlan {
163 match read_linux_sysfs_topology(sys_root, memory) {
164 Ok(topology) => plan_for_topology(topology, defaults),
165 Err(reason) => fallback_plan(
166 fallback_topology(Some(memory), defaults.available_parallelism),
167 defaults,
168 reason,
169 ),
170 }
171}
172
173pub fn read_meminfo_snapshot(path: &Path) -> Option<MemorySnapshot> {
174 let contents = fs::read_to_string(path).ok()?;
175 let mut total_bytes = None;
176 let mut available_bytes = None;
177 for line in contents.lines() {
178 if let Some(rest) = line.strip_prefix("MemTotal:") {
179 total_bytes = parse_meminfo_kib(rest);
180 } else if let Some(rest) = line.strip_prefix("MemAvailable:") {
181 available_bytes = parse_meminfo_kib(rest);
182 }
183 }
184 Some(MemorySnapshot {
185 total_bytes,
186 available_bytes,
187 })
188}
189
190fn parse_meminfo_kib(rest: &str) -> Option<u64> {
191 rest.split_whitespace()
192 .next()?
193 .parse::<u64>()
194 .ok()?
195 .checked_mul(1024)
196}
197
198fn read_linux_sysfs_topology(
199 sys_root: &Path,
200 memory: MemorySnapshot,
201) -> Result<TopologySnapshot, String> {
202 let cpu_root = sys_root.join("devices/system/cpu");
203 let online_cpus = read_online_cpus(&cpu_root)?;
204 if online_cpus.is_empty() {
205 return Err("linux sysfs reported no online CPUs".to_string());
206 }
207
208 let mut sockets = BTreeSet::new();
209 let mut core_threads: BTreeMap<(i64, i64), usize> = BTreeMap::new();
210 let mut llc_group_keys = BTreeSet::new();
211
212 for cpu in &online_cpus {
213 let topology_dir = cpu_root.join(format!("cpu{cpu}/topology"));
214 let package_id = read_i64(topology_dir.join("physical_package_id"))
215 .map_err(|err| format!("missing package topology for cpu{cpu}: {err}"))?;
216 let core_id = read_i64(topology_dir.join("core_id"))
217 .map_err(|err| format!("missing core topology for cpu{cpu}: {err}"))?;
218 sockets.insert(package_id);
219 *core_threads.entry((package_id, core_id)).or_default() += 1;
220 if let Some(group) =
221 read_llc_group_key(&cpu_root.join(format!("cpu{cpu}/cache")), package_id)
222 {
223 llc_group_keys.insert(group);
224 }
225 }
226
227 let physical_cores = core_threads.len().max(1);
228 let smt_threads_per_core = core_threads.values().copied().max().unwrap_or(1).max(1);
229 let sockets = sockets.len().max(1);
230 let numa_nodes = read_numa_node_count(sys_root, &online_cpus)
231 .unwrap_or(1)
232 .max(1);
233 let llc_groups = llc_group_keys.len().max(sockets);
234 let logical_cpus = online_cpus.len();
235 let topology_class = classify_topology(
236 sockets,
237 numa_nodes,
238 physical_cores,
239 logical_cpus,
240 smt_threads_per_core,
241 );
242
243 Ok(TopologySnapshot {
244 source: TopologySource::LinuxSysfs,
245 topology_class,
246 logical_cpus,
247 physical_cores,
248 sockets,
249 numa_nodes,
250 llc_groups,
251 smt_threads_per_core,
252 memory_total_bytes: memory.total_bytes,
253 memory_available_bytes: memory.available_bytes,
254 })
255}
256
257fn read_online_cpus(cpu_root: &Path) -> Result<BTreeSet<usize>, String> {
258 let online_path = cpu_root.join("online");
259 if let Ok(contents) = fs::read_to_string(&online_path) {
260 return parse_cpu_list(&contents)
261 .map_err(|err| format!("could not parse {}: {err}", online_path.display()));
262 }
263
264 let mut cpus = BTreeSet::new();
265 let entries = fs::read_dir(cpu_root)
266 .map_err(|err| format!("could not read {}: {err}", cpu_root.display()))?;
267 for entry in entries.flatten() {
268 let name = entry.file_name();
269 let Some(name) = name.to_str() else {
270 continue;
271 };
272 let Some(raw_id) = name.strip_prefix("cpu") else {
273 continue;
274 };
275 if let Ok(cpu) = raw_id.parse::<usize>() {
276 cpus.insert(cpu);
277 }
278 }
279 if cpus.is_empty() {
280 Err(format!(
281 "no cpuN directories found under {}",
282 cpu_root.display()
283 ))
284 } else {
285 Ok(cpus)
286 }
287}
288
289fn read_i64(path: PathBuf) -> Result<i64, String> {
290 let raw = fs::read_to_string(&path).map_err(|err| err.to_string())?;
291 raw.trim()
292 .parse::<i64>()
293 .map_err(|err| format!("{} is not an integer: {err}", path.display()))
294}
295
296fn read_llc_group_key(cache_root: &Path, package_id: i64) -> Option<String> {
297 let entries = fs::read_dir(cache_root).ok()?;
298 let mut fallback_id = None;
299 for entry in entries.flatten() {
300 let name = entry.file_name();
301 let name = name.to_string_lossy();
302 if !name.starts_with("index") {
303 continue;
304 }
305 let index_dir = entry.path();
306 let Ok(level) = fs::read_to_string(index_dir.join("level")) else {
307 continue;
308 };
309 if level.trim() != "3" {
310 continue;
311 }
312 if let Ok(cache_type) = fs::read_to_string(index_dir.join("type"))
313 && cache_type.trim() != "Unified"
314 {
315 continue;
316 }
317 if let Ok(shared) = fs::read_to_string(index_dir.join("shared_cpu_list"))
318 && let Ok(cpus) = parse_cpu_list(&shared)
319 {
320 return Some(format!("shared:{}", format_cpu_set(&cpus)));
321 }
322 if let Ok(id) = fs::read_to_string(index_dir.join("id")) {
323 fallback_id = Some(format!("id:{package_id}:{}", id.trim()));
324 }
325 }
326 fallback_id
327}
328
329fn read_numa_node_count(sys_root: &Path, online_cpus: &BTreeSet<usize>) -> Option<usize> {
330 let node_root = sys_root.join("devices/system/node");
331 let entries = fs::read_dir(node_root).ok()?;
332 let mut count = 0usize;
333 for entry in entries.flatten() {
334 let name = entry.file_name();
335 let name = name.to_string_lossy();
336 if !name.starts_with("node") {
337 continue;
338 }
339 let cpulist = fs::read_to_string(entry.path().join("cpulist")).ok()?;
340 let cpus = parse_cpu_list(&cpulist).ok()?;
341 if cpus.iter().any(|cpu| online_cpus.contains(cpu)) {
342 count += 1;
343 }
344 }
345 (count > 0).then_some(count)
346}
347
348fn parse_cpu_list(contents: &str) -> Result<BTreeSet<usize>, String> {
349 let mut cpus = BTreeSet::new();
350 for part in contents.trim().split(',').filter(|part| !part.is_empty()) {
351 let part = part.trim();
352 if let Some((start, end)) = part.split_once('-') {
353 let start = start
354 .trim()
355 .parse::<usize>()
356 .map_err(|err| format!("invalid cpu-list start {start:?}: {err}"))?;
357 let end = end
358 .trim()
359 .parse::<usize>()
360 .map_err(|err| format!("invalid cpu-list end {end:?}: {err}"))?;
361 if start > end {
362 return Err(format!("invalid descending cpu range {start}-{end}"));
363 }
364 cpus.extend(start..=end);
365 } else {
366 cpus.insert(
367 part.parse::<usize>()
368 .map_err(|err| format!("invalid cpu-list entry {part:?}: {err}"))?,
369 );
370 }
371 }
372 if cpus.is_empty() {
373 Err("cpu list is empty".to_string())
374 } else {
375 Ok(cpus)
376 }
377}
378
379fn format_cpu_set(cpus: &BTreeSet<usize>) -> String {
380 cpus.iter()
381 .map(usize::to_string)
382 .collect::<Vec<_>>()
383 .join(",")
384}
385
386fn classify_topology(
387 sockets: usize,
388 numa_nodes: usize,
389 physical_cores: usize,
390 logical_cpus: usize,
391 smt_threads_per_core: usize,
392) -> TopologyClass {
393 if sockets > 1 || numa_nodes > 1 {
394 TopologyClass::MultiSocketNuma
395 } else if physical_cores >= 32 || logical_cpus >= 64 {
396 TopologyClass::ManyCoreSingleSocket
397 } else if smt_threads_per_core > 1 {
398 TopologyClass::SingleSocketSmt
399 } else {
400 TopologyClass::SingleSocket
401 }
402}
403
404fn plan_for_topology(
405 topology: TopologySnapshot,
406 defaults: TopologyPlannerDefaults,
407) -> TopologyBudgetPlan {
408 let reserved_core_policy = reserved_core_policy_for(&topology, defaults.reserved_cores);
409 let usable_logical = topology
410 .logical_cpus
411 .saturating_sub(reserved_core_policy.reserved_cores)
412 .max(1);
413 let physical_budget = topology.physical_cores.min(usable_logical).max(1);
414 let locality_groups = topology.numa_nodes.max(topology.llc_groups).max(1);
415
416 let shard_target = if physical_budget >= 64 {
417 physical_budget / 2
418 } else if physical_budget >= 32 {
419 physical_budget * 3 / 8
420 } else {
421 physical_budget / 3
422 }
423 .max(1);
424 let shard_builders = shard_target
425 .max(defaults.shard_builders)
426 .min(usable_logical)
427 .clamp(1, 32);
428
429 let merge_cap = usable_logical.div_ceil(4).clamp(1, 16);
430 let merge_workers = locality_groups
431 .saturating_mul(2)
432 .max(defaults.merge_workers)
433 .min(merge_cap)
434 .max(1);
435
436 let page_prep_workers = (physical_budget / 4)
437 .max(defaults.page_prep_workers)
438 .min(usable_logical)
439 .clamp(1, 16);
440
441 let semantic_divisor = if topology.smt_threads_per_core > 1 {
442 8
443 } else {
444 6
445 };
446 let semantic_batchers = physical_budget
447 .div_ceil(semantic_divisor)
448 .max(topology.numa_nodes)
449 .min(usable_logical)
450 .clamp(1, 16);
451
452 let cache_cap_bytes =
453 topology_cache_cap(defaults.cache_cap_bytes, topology.memory_available_bytes);
454 let max_inflight_bytes =
455 topology_max_inflight_bytes(defaults.max_inflight_bytes, topology.memory_available_bytes);
456
457 TopologyBudgetPlan {
458 schema_version: TOPOLOGY_BUDGET_SCHEMA_VERSION.to_string(),
459 fallback_active: false,
460 decision_reason: format!(
461 "planned from {:?}: {} logical CPUs, {} physical cores, {} socket(s), {} NUMA node(s), {} LLC group(s)",
462 topology.topology_class,
463 topology.logical_cpus,
464 topology.physical_cores,
465 topology.sockets,
466 topology.numa_nodes,
467 topology.llc_groups
468 ),
469 proof_notes: vec![
470 "advisory only: live controllers keep current conservative settings until explicitly wired".to_string(),
471 "CPU budgets prefer physical cores and LLC/NUMA locality over SMT oversubscription".to_string(),
472 "RAM caps scale only when MemAvailable is large enough to preserve broad host headroom".to_string(),
473 ],
474 topology,
475 reserved_core_policy,
476 advisory_budgets: TopologyAdvisoryBudgets {
477 shard_builders,
478 merge_workers,
479 page_prep_workers,
480 semantic_batchers,
481 cache_cap_bytes,
482 max_inflight_bytes,
483 },
484 current_defaults: defaults,
485 }
486}
487
488fn reserved_core_policy_for(
489 topology: &TopologySnapshot,
490 default_reserved_cores: usize,
491) -> ReservedCorePolicy {
492 let logical_cpus = topology.logical_cpus.max(1);
493 let locality_groups = topology.numa_nodes.max(topology.llc_groups).max(1);
494 let locality_reserve = if logical_cpus >= 64 {
495 locality_groups.saturating_mul(2)
496 } else {
497 locality_groups
498 };
499 let smt_reserve = if topology.smt_threads_per_core > 1 && logical_cpus >= 16 {
500 topology.smt_threads_per_core
501 } else {
502 0
503 };
504 let manycore_reserve = if logical_cpus >= 64 {
505 logical_cpus / 12
506 } else {
507 0
508 };
509 let reserved_cores = default_reserved_cores
510 .max(locality_reserve)
511 .max(smt_reserve)
512 .max(manycore_reserve)
513 .min(16)
514 .min(logical_cpus.saturating_sub(1));
515
516 ReservedCorePolicy {
517 reserved_cores,
518 policy: "max(default, locality*2_on_large_hosts, smt_width, logical/12) capped at 16"
519 .to_string(),
520 reason: format!(
521 "reserve {} of {} logical CPUs for interactive work, IO, and NUMA/LLC service headroom",
522 reserved_cores, logical_cpus
523 ),
524 }
525}
526
527fn topology_cache_cap(default_cache_cap_bytes: usize, available_bytes: Option<u64>) -> usize {
528 let Some(available_bytes) = available_bytes else {
529 return default_cache_cap_bytes;
530 };
531 if available_bytes < 128 * GIB {
532 return default_cache_cap_bytes;
533 }
534 let candidate = (available_bytes / 64).clamp(
535 default_cache_cap_bytes as u64,
536 TOPOLOGY_CACHE_BYTE_CAP_CEILING,
537 );
538 usize::try_from(candidate).unwrap_or(usize::MAX)
539}
540
541fn topology_max_inflight_bytes(
542 default_max_inflight_bytes: usize,
543 available_bytes: Option<u64>,
544) -> usize {
545 let Some(available_bytes) = available_bytes else {
546 return default_max_inflight_bytes;
547 };
548 let candidate = (available_bytes / 4096).clamp(
549 DEFAULT_MAX_INFLIGHT_FALLBACK as u64,
550 TOPOLOGY_MAX_INFLIGHT_CEILING,
551 );
552 usize::try_from(candidate)
553 .unwrap_or(usize::MAX)
554 .max(default_max_inflight_bytes)
555}
556
557fn default_cache_cap_for_available(available_bytes: Option<u64>) -> usize {
558 let Some(available_bytes) = available_bytes else {
559 return DEFAULT_CACHE_BYTE_CAP_FALLBACK;
560 };
561 let ceiling = usize::try_from(DEFAULT_CACHE_BYTE_CAP_CEILING).unwrap_or(usize::MAX);
562 let budget = available_bytes / DEFAULT_CACHE_BYTE_CAP_MEMORY_FRACTION_DENOMINATOR;
563 let budget = budget.min(DEFAULT_CACHE_BYTE_CAP_CEILING);
564 let budget = usize::try_from(budget).unwrap_or(ceiling);
565 budget.clamp(DEFAULT_CACHE_BYTE_CAP_FALLBACK, ceiling)
566}
567
568fn fallback_topology(
569 memory: Option<MemorySnapshot>,
570 available_parallelism: usize,
571) -> TopologySnapshot {
572 let memory = memory.unwrap_or(MemorySnapshot {
573 total_bytes: None,
574 available_bytes: None,
575 });
576 TopologySnapshot {
577 source: TopologySource::Fallback,
578 topology_class: TopologyClass::Unknown,
579 logical_cpus: available_parallelism.max(1),
580 physical_cores: available_parallelism.max(1),
581 sockets: 1,
582 numa_nodes: 1,
583 llc_groups: 1,
584 smt_threads_per_core: 1,
585 memory_total_bytes: memory.total_bytes,
586 memory_available_bytes: memory.available_bytes,
587 }
588}
589
590fn fallback_plan(
591 topology: TopologySnapshot,
592 defaults: TopologyPlannerDefaults,
593 reason: String,
594) -> TopologyBudgetPlan {
595 let reserved_core_policy = ReservedCorePolicy {
596 reserved_cores: defaults.reserved_cores,
597 policy: "current conservative default".to_string(),
598 reason: "topology could not be derived, so cass preserves existing worker and RAM defaults"
599 .to_string(),
600 };
601 TopologyBudgetPlan {
602 schema_version: TOPOLOGY_BUDGET_SCHEMA_VERSION.to_string(),
603 topology,
604 reserved_core_policy,
605 advisory_budgets: TopologyAdvisoryBudgets {
606 shard_builders: defaults.shard_builders,
607 merge_workers: defaults.merge_workers,
608 page_prep_workers: defaults.page_prep_workers,
609 semantic_batchers: 1,
610 cache_cap_bytes: defaults.cache_cap_bytes,
611 max_inflight_bytes: defaults.max_inflight_bytes,
612 },
613 current_defaults: defaults,
614 fallback_active: true,
615 decision_reason: format!("using conservative defaults: {reason}"),
616 proof_notes: vec![
617 "fallback is intentionally isomorphic to current defaults for live rebuild budgets"
618 .to_string(),
619 "no /sys-derived CPU locality assumptions are made in fallback mode".to_string(),
620 ],
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627 use std::path::Path;
628
629 const GIB: u64 = 1024 * 1024 * 1024;
630
631 fn defaults(cpus: usize) -> TopologyPlannerDefaults {
632 TopologyPlannerDefaults::conservative(
633 cpus,
634 (cpus / 8).clamp(1, 8).min(cpus.saturating_sub(1)),
635 8.min(cpus.max(1)),
636 3,
637 6.min(cpus.max(1)),
638 2 * GIB as usize,
639 32 * 1024 * 1024,
640 )
641 }
642
643 fn memory(total_gib: u64, available_gib: u64) -> MemorySnapshot {
644 MemorySnapshot {
645 total_bytes: Some(total_gib * GIB),
646 available_bytes: Some(available_gib * GIB),
647 }
648 }
649
650 fn write(path: &Path, contents: &str) {
651 if let Some(parent) = path.parent() {
652 fs::create_dir_all(parent).expect("create parent");
653 }
654 fs::write(path, contents).expect("write fixture");
655 }
656
657 fn add_cpu(
658 sys: &Path,
659 cpu: usize,
660 package_id: i64,
661 core_id: i64,
662 llc_id: usize,
663 shared_cpu_list: &str,
664 ) {
665 let cpu_root = sys.join(format!("devices/system/cpu/cpu{cpu}"));
666 write(
667 &cpu_root.join("topology/physical_package_id"),
668 &package_id.to_string(),
669 );
670 write(&cpu_root.join("topology/core_id"), &core_id.to_string());
671 write(&cpu_root.join("cache/index3/level"), "3\n");
672 write(&cpu_root.join("cache/index3/type"), "Unified\n");
673 write(&cpu_root.join("cache/index3/id"), &llc_id.to_string());
674 write(
675 &cpu_root.join("cache/index3/shared_cpu_list"),
676 shared_cpu_list,
677 );
678 }
679
680 fn add_cpu_without_shared_llc(
681 sys: &Path,
682 cpu: usize,
683 package_id: i64,
684 core_id: i64,
685 llc_id: usize,
686 ) {
687 let cpu_root = sys.join(format!("devices/system/cpu/cpu{cpu}"));
688 write(
689 &cpu_root.join("topology/physical_package_id"),
690 &package_id.to_string(),
691 );
692 write(&cpu_root.join("topology/core_id"), &core_id.to_string());
693 write(&cpu_root.join("cache/index3/level"), "3\n");
694 write(&cpu_root.join("cache/index3/type"), "Unified\n");
695 write(&cpu_root.join("cache/index3/id"), &llc_id.to_string());
696 }
697
698 #[test]
699 fn one_socket_fixture_reports_single_socket_budget() {
700 let temp = tempfile::tempdir().expect("tempdir");
701 let sys = temp.path();
702 write(&sys.join("devices/system/cpu/online"), "0-7\n");
703 for cpu in 0..8 {
704 add_cpu(sys, cpu, 0, cpu as i64, 0, "0-7\n");
705 }
706 write(&sys.join("devices/system/node/node0/cpulist"), "0-7\n");
707
708 let plan = topology_budget_for_sysfs(sys, memory(64, 48), defaults(8));
709
710 assert!(!plan.fallback_active);
711 assert_eq!(plan.topology.topology_class, TopologyClass::SingleSocket);
712 assert_eq!(plan.topology.logical_cpus, 8);
713 assert_eq!(plan.topology.physical_cores, 8);
714 assert_eq!(plan.topology.sockets, 1);
715 assert_eq!(plan.topology.numa_nodes, 1);
716 assert_eq!(plan.topology.llc_groups, 1);
717 assert_eq!(plan.topology.smt_threads_per_core, 1);
718 assert!(plan.advisory_budgets.shard_builders > 0);
719 assert!(
720 plan.advisory_budgets.shard_builders
721 <= plan
722 .topology
723 .logical_cpus
724 .saturating_sub(plan.reserved_core_policy.reserved_cores)
725 .max(1)
726 );
727 }
728
729 #[test]
730 fn smt_fixture_reports_threads_per_core() {
731 let temp = tempfile::tempdir().expect("tempdir");
732 let sys = temp.path();
733 write(&sys.join("devices/system/cpu/online"), "0-7\n");
734 for cpu in 0..8 {
735 add_cpu(sys, cpu, 0, (cpu % 4) as i64, 0, "0-7\n");
736 }
737 write(&sys.join("devices/system/node/node0/cpulist"), "0-7\n");
738
739 let plan = topology_budget_for_sysfs(sys, memory(64, 48), defaults(8));
740
741 assert!(!plan.fallback_active);
742 assert_eq!(plan.topology.topology_class, TopologyClass::SingleSocketSmt);
743 assert_eq!(plan.topology.logical_cpus, 8);
744 assert_eq!(plan.topology.physical_cores, 4);
745 assert_eq!(plan.topology.smt_threads_per_core, 2);
746 }
747
748 #[test]
749 fn two_socket_numa_fixture_expands_locality_aware_budgets() {
750 let temp = tempfile::tempdir().expect("tempdir");
751 let sys = temp.path();
752 write(&sys.join("devices/system/cpu/online"), "0-63\n");
753 for cpu in 0..64 {
754 let socket = if cpu < 32 { 0 } else { 1 };
755 let shared = if cpu < 32 { "0-31\n" } else { "32-63\n" };
756 add_cpu(sys, cpu, socket, (cpu % 32) as i64, socket as usize, shared);
757 }
758 write(&sys.join("devices/system/node/node0/cpulist"), "0-31\n");
759 write(&sys.join("devices/system/node/node1/cpulist"), "32-63\n");
760
761 let plan = topology_budget_for_sysfs(sys, memory(256, 224), defaults(64));
762
763 assert!(!plan.fallback_active);
764 assert_eq!(plan.topology.topology_class, TopologyClass::MultiSocketNuma);
765 assert_eq!(plan.topology.logical_cpus, 64);
766 assert_eq!(plan.topology.physical_cores, 64);
767 assert_eq!(plan.topology.sockets, 2);
768 assert_eq!(plan.topology.numa_nodes, 2);
769 assert_eq!(plan.topology.llc_groups, 2);
770 assert_eq!(plan.reserved_core_policy.reserved_cores, 8);
771 assert_eq!(plan.advisory_budgets.shard_builders, 21);
772 assert_eq!(plan.advisory_budgets.merge_workers, 4);
773 assert_eq!(plan.advisory_budgets.page_prep_workers, 14);
774 assert_eq!(plan.advisory_budgets.semantic_batchers, 10);
775 assert!(plan.advisory_budgets.cache_cap_bytes > plan.current_defaults.cache_cap_bytes);
776 }
777
778 #[test]
779 fn llc_id_fallback_is_package_scoped_when_shared_cpu_list_is_missing() {
780 let temp = tempfile::tempdir().expect("tempdir");
781 let sys = temp.path();
782 write(&sys.join("devices/system/cpu/online"), "0-3\n");
783 add_cpu_without_shared_llc(sys, 0, 0, 0, 0);
784 add_cpu_without_shared_llc(sys, 1, 0, 1, 0);
785 add_cpu_without_shared_llc(sys, 2, 1, 0, 0);
786 add_cpu_without_shared_llc(sys, 3, 1, 1, 0);
787 write(&sys.join("devices/system/node/node0/cpulist"), "0-1\n");
788 write(&sys.join("devices/system/node/node1/cpulist"), "2-3\n");
789
790 let plan = topology_budget_for_sysfs(sys, memory(64, 48), defaults(4));
791
792 assert!(!plan.fallback_active);
793 assert_eq!(plan.topology.sockets, 2);
794 assert_eq!(plan.topology.llc_groups, 2);
795 }
796
797 #[test]
798 fn missing_topology_preserves_conservative_defaults() {
799 let temp = tempfile::tempdir().expect("tempdir");
800 let plan = topology_budget_for_sysfs(temp.path(), memory(256, 224), defaults(64));
801
802 assert!(plan.fallback_active);
803 assert_eq!(plan.topology.source, TopologySource::Fallback);
804 assert_eq!(plan.topology.topology_class, TopologyClass::Unknown);
805 assert_eq!(
806 plan.advisory_budgets.shard_builders,
807 plan.current_defaults.shard_builders
808 );
809 assert_eq!(
810 plan.advisory_budgets.merge_workers,
811 plan.current_defaults.merge_workers
812 );
813 assert_eq!(
814 plan.advisory_budgets.page_prep_workers,
815 plan.current_defaults.page_prep_workers
816 );
817 assert_eq!(
818 plan.advisory_budgets.cache_cap_bytes,
819 plan.current_defaults.cache_cap_bytes
820 );
821 }
822
823 #[test]
824 fn meminfo_parser_reads_total_and_available_kib() {
825 let temp = tempfile::tempdir().expect("tempdir");
826 let path = temp.path().join("meminfo");
827 write(
828 &path,
829 "MemTotal: 268435456 kB\nMemAvailable: 234881024 kB\n",
830 );
831
832 let snapshot = read_meminfo_snapshot(&path).expect("meminfo snapshot");
833
834 assert_eq!(snapshot.total_bytes, Some(256 * GIB));
835 assert_eq!(snapshot.available_bytes, Some(224 * GIB));
836 }
837}