Skip to main content

coding_agent_search/
topology_budget.rs

1//! Topology-aware advisory budgets for large indexing hosts.
2//!
3//! The planner is intentionally data-only: it reads Linux topology, compares
4//! against the current conservative defaults, and reports advisory CPU/RAM
5//! budgets without changing the live indexing controllers.
6
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9use std::fs;
10use std::path::{Path, PathBuf};
11
12pub const TOPOLOGY_BUDGET_SCHEMA_VERSION: &str = "1";
13
14const GIB: u64 = 1024 * 1024 * 1024;
15const DEFAULT_CACHE_BYTE_CAP_FALLBACK: usize = 64 * 1024 * 1024;
16const DEFAULT_CACHE_BYTE_CAP_MEMORY_FRACTION_DENOMINATOR: u64 = 128;
17const DEFAULT_CACHE_BYTE_CAP_CEILING: u64 = 2 * GIB;
18const TOPOLOGY_CACHE_BYTE_CAP_CEILING: u64 = 8 * GIB;
19const DEFAULT_MAX_INFLIGHT_FALLBACK: usize = 32 * 1024 * 1024;
20const TOPOLOGY_MAX_INFLIGHT_CEILING: u64 = 512 * 1024 * 1024;
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub struct TopologyBudgetPlan {
24    pub schema_version: String,
25    pub topology: TopologySnapshot,
26    pub reserved_core_policy: ReservedCorePolicy,
27    pub advisory_budgets: TopologyAdvisoryBudgets,
28    pub current_defaults: TopologyPlannerDefaults,
29    pub fallback_active: bool,
30    pub decision_reason: String,
31    pub proof_notes: Vec<String>,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct TopologySnapshot {
36    pub source: TopologySource,
37    pub topology_class: TopologyClass,
38    pub logical_cpus: usize,
39    pub physical_cores: usize,
40    pub sockets: usize,
41    pub numa_nodes: usize,
42    pub llc_groups: usize,
43    pub smt_threads_per_core: usize,
44    pub memory_total_bytes: Option<u64>,
45    pub memory_available_bytes: Option<u64>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum TopologySource {
51    LinuxSysfs,
52    Fallback,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56#[serde(rename_all = "snake_case")]
57pub enum TopologyClass {
58    Unknown,
59    SingleSocket,
60    SingleSocketSmt,
61    ManyCoreSingleSocket,
62    MultiSocketNuma,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66pub struct ReservedCorePolicy {
67    pub reserved_cores: usize,
68    pub policy: String,
69    pub reason: String,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub struct TopologyAdvisoryBudgets {
74    pub shard_builders: usize,
75    pub merge_workers: usize,
76    pub page_prep_workers: usize,
77    pub semantic_batchers: usize,
78    pub cache_cap_bytes: usize,
79    pub max_inflight_bytes: usize,
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub struct TopologyPlannerDefaults {
84    pub available_parallelism: usize,
85    pub reserved_cores: usize,
86    pub shard_builders: usize,
87    pub merge_workers: usize,
88    pub page_prep_workers: usize,
89    pub cache_cap_bytes: usize,
90    pub max_inflight_bytes: usize,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub struct MemorySnapshot {
95    pub total_bytes: Option<u64>,
96    pub available_bytes: Option<u64>,
97}
98
99impl TopologyPlannerDefaults {
100    pub fn conservative(
101        available_parallelism: usize,
102        reserved_cores: usize,
103        shard_builders: usize,
104        merge_workers: usize,
105        page_prep_workers: usize,
106        cache_cap_bytes: usize,
107        max_inflight_bytes: usize,
108    ) -> Self {
109        Self {
110            available_parallelism: available_parallelism.max(1),
111            reserved_cores: reserved_cores.min(available_parallelism.saturating_sub(1)),
112            shard_builders: shard_builders.max(1),
113            merge_workers: merge_workers.max(1),
114            page_prep_workers: page_prep_workers.max(1),
115            cache_cap_bytes: cache_cap_bytes.max(1),
116            max_inflight_bytes: max_inflight_bytes.max(1),
117        }
118    }
119
120    pub(crate) fn from_current_process() -> Self {
121        let pipeline = crate::indexer::lexical_rebuild_pipeline_settings_snapshot();
122        let memory = read_meminfo_snapshot(Path::new("/proc/meminfo")).unwrap_or(MemorySnapshot {
123            total_bytes: None,
124            available_bytes: None,
125        });
126        Self::conservative(
127            pipeline.available_parallelism,
128            pipeline.reserved_cores,
129            pipeline.staged_shard_builders,
130            pipeline.staged_merge_workers,
131            pipeline.page_prep_workers,
132            default_cache_cap_for_available(memory.available_bytes),
133            pipeline.pipeline_max_message_bytes_in_flight,
134        )
135    }
136}
137
138pub(crate) fn inspect_host_topology_budget() -> TopologyBudgetPlan {
139    let defaults = TopologyPlannerDefaults::from_current_process();
140    #[cfg(target_os = "linux")]
141    {
142        let memory = read_meminfo_snapshot(Path::new("/proc/meminfo")).unwrap_or(MemorySnapshot {
143            total_bytes: None,
144            available_bytes: None,
145        });
146        topology_budget_for_sysfs(Path::new("/sys"), memory, defaults)
147    }
148    #[cfg(not(target_os = "linux"))]
149    {
150        fallback_plan(
151            fallback_topology(None, defaults.available_parallelism),
152            defaults,
153            "linux sysfs topology is unavailable on this platform".to_string(),
154        )
155    }
156}
157
158pub fn topology_budget_for_sysfs(
159    sys_root: &Path,
160    memory: MemorySnapshot,
161    defaults: TopologyPlannerDefaults,
162) -> TopologyBudgetPlan {
163    match read_linux_sysfs_topology(sys_root, memory) {
164        Ok(topology) => plan_for_topology(topology, defaults),
165        Err(reason) => fallback_plan(
166            fallback_topology(Some(memory), defaults.available_parallelism),
167            defaults,
168            reason,
169        ),
170    }
171}
172
173pub fn read_meminfo_snapshot(path: &Path) -> Option<MemorySnapshot> {
174    let contents = fs::read_to_string(path).ok()?;
175    let mut total_bytes = None;
176    let mut available_bytes = None;
177    for line in contents.lines() {
178        if let Some(rest) = line.strip_prefix("MemTotal:") {
179            total_bytes = parse_meminfo_kib(rest);
180        } else if let Some(rest) = line.strip_prefix("MemAvailable:") {
181            available_bytes = parse_meminfo_kib(rest);
182        }
183    }
184    Some(MemorySnapshot {
185        total_bytes,
186        available_bytes,
187    })
188}
189
190fn parse_meminfo_kib(rest: &str) -> Option<u64> {
191    rest.split_whitespace()
192        .next()?
193        .parse::<u64>()
194        .ok()?
195        .checked_mul(1024)
196}
197
198fn read_linux_sysfs_topology(
199    sys_root: &Path,
200    memory: MemorySnapshot,
201) -> Result<TopologySnapshot, String> {
202    let cpu_root = sys_root.join("devices/system/cpu");
203    let online_cpus = read_online_cpus(&cpu_root)?;
204    if online_cpus.is_empty() {
205        return Err("linux sysfs reported no online CPUs".to_string());
206    }
207
208    let mut sockets = BTreeSet::new();
209    let mut core_threads: BTreeMap<(i64, i64), usize> = BTreeMap::new();
210    let mut llc_group_keys = BTreeSet::new();
211
212    for cpu in &online_cpus {
213        let topology_dir = cpu_root.join(format!("cpu{cpu}/topology"));
214        let package_id = read_i64(topology_dir.join("physical_package_id"))
215            .map_err(|err| format!("missing package topology for cpu{cpu}: {err}"))?;
216        let core_id = read_i64(topology_dir.join("core_id"))
217            .map_err(|err| format!("missing core topology for cpu{cpu}: {err}"))?;
218        sockets.insert(package_id);
219        *core_threads.entry((package_id, core_id)).or_default() += 1;
220        if let Some(group) =
221            read_llc_group_key(&cpu_root.join(format!("cpu{cpu}/cache")), package_id)
222        {
223            llc_group_keys.insert(group);
224        }
225    }
226
227    let physical_cores = core_threads.len().max(1);
228    let smt_threads_per_core = core_threads.values().copied().max().unwrap_or(1).max(1);
229    let sockets = sockets.len().max(1);
230    let numa_nodes = read_numa_node_count(sys_root, &online_cpus)
231        .unwrap_or(1)
232        .max(1);
233    let llc_groups = llc_group_keys.len().max(sockets);
234    let logical_cpus = online_cpus.len();
235    let topology_class = classify_topology(
236        sockets,
237        numa_nodes,
238        physical_cores,
239        logical_cpus,
240        smt_threads_per_core,
241    );
242
243    Ok(TopologySnapshot {
244        source: TopologySource::LinuxSysfs,
245        topology_class,
246        logical_cpus,
247        physical_cores,
248        sockets,
249        numa_nodes,
250        llc_groups,
251        smt_threads_per_core,
252        memory_total_bytes: memory.total_bytes,
253        memory_available_bytes: memory.available_bytes,
254    })
255}
256
257fn read_online_cpus(cpu_root: &Path) -> Result<BTreeSet<usize>, String> {
258    let online_path = cpu_root.join("online");
259    if let Ok(contents) = fs::read_to_string(&online_path) {
260        return parse_cpu_list(&contents)
261            .map_err(|err| format!("could not parse {}: {err}", online_path.display()));
262    }
263
264    let mut cpus = BTreeSet::new();
265    let entries = fs::read_dir(cpu_root)
266        .map_err(|err| format!("could not read {}: {err}", cpu_root.display()))?;
267    for entry in entries.flatten() {
268        let name = entry.file_name();
269        let Some(name) = name.to_str() else {
270            continue;
271        };
272        let Some(raw_id) = name.strip_prefix("cpu") else {
273            continue;
274        };
275        if let Ok(cpu) = raw_id.parse::<usize>() {
276            cpus.insert(cpu);
277        }
278    }
279    if cpus.is_empty() {
280        Err(format!(
281            "no cpuN directories found under {}",
282            cpu_root.display()
283        ))
284    } else {
285        Ok(cpus)
286    }
287}
288
289fn read_i64(path: PathBuf) -> Result<i64, String> {
290    let raw = fs::read_to_string(&path).map_err(|err| err.to_string())?;
291    raw.trim()
292        .parse::<i64>()
293        .map_err(|err| format!("{} is not an integer: {err}", path.display()))
294}
295
296fn read_llc_group_key(cache_root: &Path, package_id: i64) -> Option<String> {
297    let entries = fs::read_dir(cache_root).ok()?;
298    let mut fallback_id = None;
299    for entry in entries.flatten() {
300        let name = entry.file_name();
301        let name = name.to_string_lossy();
302        if !name.starts_with("index") {
303            continue;
304        }
305        let index_dir = entry.path();
306        let Ok(level) = fs::read_to_string(index_dir.join("level")) else {
307            continue;
308        };
309        if level.trim() != "3" {
310            continue;
311        }
312        if let Ok(cache_type) = fs::read_to_string(index_dir.join("type"))
313            && cache_type.trim() != "Unified"
314        {
315            continue;
316        }
317        if let Ok(shared) = fs::read_to_string(index_dir.join("shared_cpu_list"))
318            && let Ok(cpus) = parse_cpu_list(&shared)
319        {
320            return Some(format!("shared:{}", format_cpu_set(&cpus)));
321        }
322        if let Ok(id) = fs::read_to_string(index_dir.join("id")) {
323            fallback_id = Some(format!("id:{package_id}:{}", id.trim()));
324        }
325    }
326    fallback_id
327}
328
329fn read_numa_node_count(sys_root: &Path, online_cpus: &BTreeSet<usize>) -> Option<usize> {
330    let node_root = sys_root.join("devices/system/node");
331    let entries = fs::read_dir(node_root).ok()?;
332    let mut count = 0usize;
333    for entry in entries.flatten() {
334        let name = entry.file_name();
335        let name = name.to_string_lossy();
336        if !name.starts_with("node") {
337            continue;
338        }
339        let cpulist = fs::read_to_string(entry.path().join("cpulist")).ok()?;
340        let cpus = parse_cpu_list(&cpulist).ok()?;
341        if cpus.iter().any(|cpu| online_cpus.contains(cpu)) {
342            count += 1;
343        }
344    }
345    (count > 0).then_some(count)
346}
347
348fn parse_cpu_list(contents: &str) -> Result<BTreeSet<usize>, String> {
349    let mut cpus = BTreeSet::new();
350    for part in contents.trim().split(',').filter(|part| !part.is_empty()) {
351        let part = part.trim();
352        if let Some((start, end)) = part.split_once('-') {
353            let start = start
354                .trim()
355                .parse::<usize>()
356                .map_err(|err| format!("invalid cpu-list start {start:?}: {err}"))?;
357            let end = end
358                .trim()
359                .parse::<usize>()
360                .map_err(|err| format!("invalid cpu-list end {end:?}: {err}"))?;
361            if start > end {
362                return Err(format!("invalid descending cpu range {start}-{end}"));
363            }
364            cpus.extend(start..=end);
365        } else {
366            cpus.insert(
367                part.parse::<usize>()
368                    .map_err(|err| format!("invalid cpu-list entry {part:?}: {err}"))?,
369            );
370        }
371    }
372    if cpus.is_empty() {
373        Err("cpu list is empty".to_string())
374    } else {
375        Ok(cpus)
376    }
377}
378
379fn format_cpu_set(cpus: &BTreeSet<usize>) -> String {
380    cpus.iter()
381        .map(usize::to_string)
382        .collect::<Vec<_>>()
383        .join(",")
384}
385
386fn classify_topology(
387    sockets: usize,
388    numa_nodes: usize,
389    physical_cores: usize,
390    logical_cpus: usize,
391    smt_threads_per_core: usize,
392) -> TopologyClass {
393    if sockets > 1 || numa_nodes > 1 {
394        TopologyClass::MultiSocketNuma
395    } else if physical_cores >= 32 || logical_cpus >= 64 {
396        TopologyClass::ManyCoreSingleSocket
397    } else if smt_threads_per_core > 1 {
398        TopologyClass::SingleSocketSmt
399    } else {
400        TopologyClass::SingleSocket
401    }
402}
403
404fn plan_for_topology(
405    topology: TopologySnapshot,
406    defaults: TopologyPlannerDefaults,
407) -> TopologyBudgetPlan {
408    let reserved_core_policy = reserved_core_policy_for(&topology, defaults.reserved_cores);
409    let usable_logical = topology
410        .logical_cpus
411        .saturating_sub(reserved_core_policy.reserved_cores)
412        .max(1);
413    let physical_budget = topology.physical_cores.min(usable_logical).max(1);
414    let locality_groups = topology.numa_nodes.max(topology.llc_groups).max(1);
415
416    let shard_target = if physical_budget >= 64 {
417        physical_budget / 2
418    } else if physical_budget >= 32 {
419        physical_budget * 3 / 8
420    } else {
421        physical_budget / 3
422    }
423    .max(1);
424    let shard_builders = shard_target
425        .max(defaults.shard_builders)
426        .min(usable_logical)
427        .clamp(1, 32);
428
429    let merge_cap = usable_logical.div_ceil(4).clamp(1, 16);
430    let merge_workers = locality_groups
431        .saturating_mul(2)
432        .max(defaults.merge_workers)
433        .min(merge_cap)
434        .max(1);
435
436    let page_prep_workers = (physical_budget / 4)
437        .max(defaults.page_prep_workers)
438        .min(usable_logical)
439        .clamp(1, 16);
440
441    let semantic_divisor = if topology.smt_threads_per_core > 1 {
442        8
443    } else {
444        6
445    };
446    let semantic_batchers = physical_budget
447        .div_ceil(semantic_divisor)
448        .max(topology.numa_nodes)
449        .min(usable_logical)
450        .clamp(1, 16);
451
452    let cache_cap_bytes =
453        topology_cache_cap(defaults.cache_cap_bytes, topology.memory_available_bytes);
454    let max_inflight_bytes =
455        topology_max_inflight_bytes(defaults.max_inflight_bytes, topology.memory_available_bytes);
456
457    TopologyBudgetPlan {
458        schema_version: TOPOLOGY_BUDGET_SCHEMA_VERSION.to_string(),
459        fallback_active: false,
460        decision_reason: format!(
461            "planned from {:?}: {} logical CPUs, {} physical cores, {} socket(s), {} NUMA node(s), {} LLC group(s)",
462            topology.topology_class,
463            topology.logical_cpus,
464            topology.physical_cores,
465            topology.sockets,
466            topology.numa_nodes,
467            topology.llc_groups
468        ),
469        proof_notes: vec![
470            "advisory only: live controllers keep current conservative settings until explicitly wired".to_string(),
471            "CPU budgets prefer physical cores and LLC/NUMA locality over SMT oversubscription".to_string(),
472            "RAM caps scale only when MemAvailable is large enough to preserve broad host headroom".to_string(),
473        ],
474        topology,
475        reserved_core_policy,
476        advisory_budgets: TopologyAdvisoryBudgets {
477            shard_builders,
478            merge_workers,
479            page_prep_workers,
480            semantic_batchers,
481            cache_cap_bytes,
482            max_inflight_bytes,
483        },
484        current_defaults: defaults,
485    }
486}
487
488fn reserved_core_policy_for(
489    topology: &TopologySnapshot,
490    default_reserved_cores: usize,
491) -> ReservedCorePolicy {
492    let logical_cpus = topology.logical_cpus.max(1);
493    let locality_groups = topology.numa_nodes.max(topology.llc_groups).max(1);
494    let locality_reserve = if logical_cpus >= 64 {
495        locality_groups.saturating_mul(2)
496    } else {
497        locality_groups
498    };
499    let smt_reserve = if topology.smt_threads_per_core > 1 && logical_cpus >= 16 {
500        topology.smt_threads_per_core
501    } else {
502        0
503    };
504    let manycore_reserve = if logical_cpus >= 64 {
505        logical_cpus / 12
506    } else {
507        0
508    };
509    let reserved_cores = default_reserved_cores
510        .max(locality_reserve)
511        .max(smt_reserve)
512        .max(manycore_reserve)
513        .min(16)
514        .min(logical_cpus.saturating_sub(1));
515
516    ReservedCorePolicy {
517        reserved_cores,
518        policy: "max(default, locality*2_on_large_hosts, smt_width, logical/12) capped at 16"
519            .to_string(),
520        reason: format!(
521            "reserve {} of {} logical CPUs for interactive work, IO, and NUMA/LLC service headroom",
522            reserved_cores, logical_cpus
523        ),
524    }
525}
526
527fn topology_cache_cap(default_cache_cap_bytes: usize, available_bytes: Option<u64>) -> usize {
528    let Some(available_bytes) = available_bytes else {
529        return default_cache_cap_bytes;
530    };
531    if available_bytes < 128 * GIB {
532        return default_cache_cap_bytes;
533    }
534    let candidate = (available_bytes / 64).clamp(
535        default_cache_cap_bytes as u64,
536        TOPOLOGY_CACHE_BYTE_CAP_CEILING,
537    );
538    usize::try_from(candidate).unwrap_or(usize::MAX)
539}
540
541fn topology_max_inflight_bytes(
542    default_max_inflight_bytes: usize,
543    available_bytes: Option<u64>,
544) -> usize {
545    let Some(available_bytes) = available_bytes else {
546        return default_max_inflight_bytes;
547    };
548    let candidate = (available_bytes / 4096).clamp(
549        DEFAULT_MAX_INFLIGHT_FALLBACK as u64,
550        TOPOLOGY_MAX_INFLIGHT_CEILING,
551    );
552    usize::try_from(candidate)
553        .unwrap_or(usize::MAX)
554        .max(default_max_inflight_bytes)
555}
556
557fn default_cache_cap_for_available(available_bytes: Option<u64>) -> usize {
558    let Some(available_bytes) = available_bytes else {
559        return DEFAULT_CACHE_BYTE_CAP_FALLBACK;
560    };
561    let ceiling = usize::try_from(DEFAULT_CACHE_BYTE_CAP_CEILING).unwrap_or(usize::MAX);
562    let budget = available_bytes / DEFAULT_CACHE_BYTE_CAP_MEMORY_FRACTION_DENOMINATOR;
563    let budget = budget.min(DEFAULT_CACHE_BYTE_CAP_CEILING);
564    let budget = usize::try_from(budget).unwrap_or(ceiling);
565    budget.clamp(DEFAULT_CACHE_BYTE_CAP_FALLBACK, ceiling)
566}
567
568fn fallback_topology(
569    memory: Option<MemorySnapshot>,
570    available_parallelism: usize,
571) -> TopologySnapshot {
572    let memory = memory.unwrap_or(MemorySnapshot {
573        total_bytes: None,
574        available_bytes: None,
575    });
576    TopologySnapshot {
577        source: TopologySource::Fallback,
578        topology_class: TopologyClass::Unknown,
579        logical_cpus: available_parallelism.max(1),
580        physical_cores: available_parallelism.max(1),
581        sockets: 1,
582        numa_nodes: 1,
583        llc_groups: 1,
584        smt_threads_per_core: 1,
585        memory_total_bytes: memory.total_bytes,
586        memory_available_bytes: memory.available_bytes,
587    }
588}
589
590fn fallback_plan(
591    topology: TopologySnapshot,
592    defaults: TopologyPlannerDefaults,
593    reason: String,
594) -> TopologyBudgetPlan {
595    let reserved_core_policy = ReservedCorePolicy {
596        reserved_cores: defaults.reserved_cores,
597        policy: "current conservative default".to_string(),
598        reason: "topology could not be derived, so cass preserves existing worker and RAM defaults"
599            .to_string(),
600    };
601    TopologyBudgetPlan {
602        schema_version: TOPOLOGY_BUDGET_SCHEMA_VERSION.to_string(),
603        topology,
604        reserved_core_policy,
605        advisory_budgets: TopologyAdvisoryBudgets {
606            shard_builders: defaults.shard_builders,
607            merge_workers: defaults.merge_workers,
608            page_prep_workers: defaults.page_prep_workers,
609            semantic_batchers: 1,
610            cache_cap_bytes: defaults.cache_cap_bytes,
611            max_inflight_bytes: defaults.max_inflight_bytes,
612        },
613        current_defaults: defaults,
614        fallback_active: true,
615        decision_reason: format!("using conservative defaults: {reason}"),
616        proof_notes: vec![
617            "fallback is intentionally isomorphic to current defaults for live rebuild budgets"
618                .to_string(),
619            "no /sys-derived CPU locality assumptions are made in fallback mode".to_string(),
620        ],
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use std::path::Path;
628
629    const GIB: u64 = 1024 * 1024 * 1024;
630
631    fn defaults(cpus: usize) -> TopologyPlannerDefaults {
632        TopologyPlannerDefaults::conservative(
633            cpus,
634            (cpus / 8).clamp(1, 8).min(cpus.saturating_sub(1)),
635            8.min(cpus.max(1)),
636            3,
637            6.min(cpus.max(1)),
638            2 * GIB as usize,
639            32 * 1024 * 1024,
640        )
641    }
642
643    fn memory(total_gib: u64, available_gib: u64) -> MemorySnapshot {
644        MemorySnapshot {
645            total_bytes: Some(total_gib * GIB),
646            available_bytes: Some(available_gib * GIB),
647        }
648    }
649
650    fn write(path: &Path, contents: &str) {
651        if let Some(parent) = path.parent() {
652            fs::create_dir_all(parent).expect("create parent");
653        }
654        fs::write(path, contents).expect("write fixture");
655    }
656
657    fn add_cpu(
658        sys: &Path,
659        cpu: usize,
660        package_id: i64,
661        core_id: i64,
662        llc_id: usize,
663        shared_cpu_list: &str,
664    ) {
665        let cpu_root = sys.join(format!("devices/system/cpu/cpu{cpu}"));
666        write(
667            &cpu_root.join("topology/physical_package_id"),
668            &package_id.to_string(),
669        );
670        write(&cpu_root.join("topology/core_id"), &core_id.to_string());
671        write(&cpu_root.join("cache/index3/level"), "3\n");
672        write(&cpu_root.join("cache/index3/type"), "Unified\n");
673        write(&cpu_root.join("cache/index3/id"), &llc_id.to_string());
674        write(
675            &cpu_root.join("cache/index3/shared_cpu_list"),
676            shared_cpu_list,
677        );
678    }
679
680    fn add_cpu_without_shared_llc(
681        sys: &Path,
682        cpu: usize,
683        package_id: i64,
684        core_id: i64,
685        llc_id: usize,
686    ) {
687        let cpu_root = sys.join(format!("devices/system/cpu/cpu{cpu}"));
688        write(
689            &cpu_root.join("topology/physical_package_id"),
690            &package_id.to_string(),
691        );
692        write(&cpu_root.join("topology/core_id"), &core_id.to_string());
693        write(&cpu_root.join("cache/index3/level"), "3\n");
694        write(&cpu_root.join("cache/index3/type"), "Unified\n");
695        write(&cpu_root.join("cache/index3/id"), &llc_id.to_string());
696    }
697
698    #[test]
699    fn one_socket_fixture_reports_single_socket_budget() {
700        let temp = tempfile::tempdir().expect("tempdir");
701        let sys = temp.path();
702        write(&sys.join("devices/system/cpu/online"), "0-7\n");
703        for cpu in 0..8 {
704            add_cpu(sys, cpu, 0, cpu as i64, 0, "0-7\n");
705        }
706        write(&sys.join("devices/system/node/node0/cpulist"), "0-7\n");
707
708        let plan = topology_budget_for_sysfs(sys, memory(64, 48), defaults(8));
709
710        assert!(!plan.fallback_active);
711        assert_eq!(plan.topology.topology_class, TopologyClass::SingleSocket);
712        assert_eq!(plan.topology.logical_cpus, 8);
713        assert_eq!(plan.topology.physical_cores, 8);
714        assert_eq!(plan.topology.sockets, 1);
715        assert_eq!(plan.topology.numa_nodes, 1);
716        assert_eq!(plan.topology.llc_groups, 1);
717        assert_eq!(plan.topology.smt_threads_per_core, 1);
718        assert!(plan.advisory_budgets.shard_builders > 0);
719        assert!(
720            plan.advisory_budgets.shard_builders
721                <= plan
722                    .topology
723                    .logical_cpus
724                    .saturating_sub(plan.reserved_core_policy.reserved_cores)
725                    .max(1)
726        );
727    }
728
729    #[test]
730    fn smt_fixture_reports_threads_per_core() {
731        let temp = tempfile::tempdir().expect("tempdir");
732        let sys = temp.path();
733        write(&sys.join("devices/system/cpu/online"), "0-7\n");
734        for cpu in 0..8 {
735            add_cpu(sys, cpu, 0, (cpu % 4) as i64, 0, "0-7\n");
736        }
737        write(&sys.join("devices/system/node/node0/cpulist"), "0-7\n");
738
739        let plan = topology_budget_for_sysfs(sys, memory(64, 48), defaults(8));
740
741        assert!(!plan.fallback_active);
742        assert_eq!(plan.topology.topology_class, TopologyClass::SingleSocketSmt);
743        assert_eq!(plan.topology.logical_cpus, 8);
744        assert_eq!(plan.topology.physical_cores, 4);
745        assert_eq!(plan.topology.smt_threads_per_core, 2);
746    }
747
748    #[test]
749    fn two_socket_numa_fixture_expands_locality_aware_budgets() {
750        let temp = tempfile::tempdir().expect("tempdir");
751        let sys = temp.path();
752        write(&sys.join("devices/system/cpu/online"), "0-63\n");
753        for cpu in 0..64 {
754            let socket = if cpu < 32 { 0 } else { 1 };
755            let shared = if cpu < 32 { "0-31\n" } else { "32-63\n" };
756            add_cpu(sys, cpu, socket, (cpu % 32) as i64, socket as usize, shared);
757        }
758        write(&sys.join("devices/system/node/node0/cpulist"), "0-31\n");
759        write(&sys.join("devices/system/node/node1/cpulist"), "32-63\n");
760
761        let plan = topology_budget_for_sysfs(sys, memory(256, 224), defaults(64));
762
763        assert!(!plan.fallback_active);
764        assert_eq!(plan.topology.topology_class, TopologyClass::MultiSocketNuma);
765        assert_eq!(plan.topology.logical_cpus, 64);
766        assert_eq!(plan.topology.physical_cores, 64);
767        assert_eq!(plan.topology.sockets, 2);
768        assert_eq!(plan.topology.numa_nodes, 2);
769        assert_eq!(plan.topology.llc_groups, 2);
770        assert_eq!(plan.reserved_core_policy.reserved_cores, 8);
771        assert_eq!(plan.advisory_budgets.shard_builders, 21);
772        assert_eq!(plan.advisory_budgets.merge_workers, 4);
773        assert_eq!(plan.advisory_budgets.page_prep_workers, 14);
774        assert_eq!(plan.advisory_budgets.semantic_batchers, 10);
775        assert!(plan.advisory_budgets.cache_cap_bytes > plan.current_defaults.cache_cap_bytes);
776    }
777
778    #[test]
779    fn llc_id_fallback_is_package_scoped_when_shared_cpu_list_is_missing() {
780        let temp = tempfile::tempdir().expect("tempdir");
781        let sys = temp.path();
782        write(&sys.join("devices/system/cpu/online"), "0-3\n");
783        add_cpu_without_shared_llc(sys, 0, 0, 0, 0);
784        add_cpu_without_shared_llc(sys, 1, 0, 1, 0);
785        add_cpu_without_shared_llc(sys, 2, 1, 0, 0);
786        add_cpu_without_shared_llc(sys, 3, 1, 1, 0);
787        write(&sys.join("devices/system/node/node0/cpulist"), "0-1\n");
788        write(&sys.join("devices/system/node/node1/cpulist"), "2-3\n");
789
790        let plan = topology_budget_for_sysfs(sys, memory(64, 48), defaults(4));
791
792        assert!(!plan.fallback_active);
793        assert_eq!(plan.topology.sockets, 2);
794        assert_eq!(plan.topology.llc_groups, 2);
795    }
796
797    #[test]
798    fn missing_topology_preserves_conservative_defaults() {
799        let temp = tempfile::tempdir().expect("tempdir");
800        let plan = topology_budget_for_sysfs(temp.path(), memory(256, 224), defaults(64));
801
802        assert!(plan.fallback_active);
803        assert_eq!(plan.topology.source, TopologySource::Fallback);
804        assert_eq!(plan.topology.topology_class, TopologyClass::Unknown);
805        assert_eq!(
806            plan.advisory_budgets.shard_builders,
807            plan.current_defaults.shard_builders
808        );
809        assert_eq!(
810            plan.advisory_budgets.merge_workers,
811            plan.current_defaults.merge_workers
812        );
813        assert_eq!(
814            plan.advisory_budgets.page_prep_workers,
815            plan.current_defaults.page_prep_workers
816        );
817        assert_eq!(
818            plan.advisory_budgets.cache_cap_bytes,
819            plan.current_defaults.cache_cap_bytes
820        );
821    }
822
823    #[test]
824    fn meminfo_parser_reads_total_and_available_kib() {
825        let temp = tempfile::tempdir().expect("tempdir");
826        let path = temp.path().join("meminfo");
827        write(
828            &path,
829            "MemTotal:       268435456 kB\nMemAvailable:   234881024 kB\n",
830        );
831
832        let snapshot = read_meminfo_snapshot(&path).expect("meminfo snapshot");
833
834        assert_eq!(snapshot.total_bytes, Some(256 * GIB));
835        assert_eq!(snapshot.available_bytes, Some(224 * GIB));
836    }
837}