Skip to main content

kbolt_core/
local.rs

1use std::collections::{HashMap, HashSet};
2use std::fs::{self, File};
3use std::io;
4use std::net::TcpListener;
5use std::path::{Path, PathBuf};
6use std::process::{Command, Stdio};
7use std::sync::{Mutex, OnceLock};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use hf_hub::api::sync::ApiBuilder;
12use hf_hub::{Repo, RepoType};
13use kbolt_types::{KboltError, LocalAction, LocalReport, LocalServiceReport};
14
15use crate::config::{
16    self, Config, EmbedderRoleConfig, ExpanderRoleConfig, ProviderOperation, ProviderProfileConfig,
17    RerankerRoleConfig,
18};
19use crate::models::{self, build_inference_clients_without_managed_recovery, EmbeddingInputKind};
20use crate::Result;
21
22const APP_NAME: &str = "kbolt";
23const LOCALHOST: &str = "127.0.0.1";
24const LLAMA_SERVER_BREW_HINT: &str = "brew install llama.cpp";
25const LLAMA_SERVER_LOG_VERBOSITY: &str = "1";
26const MODEL_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15);
27const STOP_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
28pub(crate) const MANAGED_EMBEDDER_PARALLEL_REQUESTS: usize = 4;
29pub(crate) const MANAGED_RERANKER_PARALLEL_REQUESTS: usize = 4;
30const MANAGED_PARALLEL_CONTEXT_TOKENS: usize = 2048;
31
32const MANAGED_EMBED_PROVIDER: &str = "kbolt_local_embed";
33pub(crate) const MANAGED_RERANK_PROVIDER: &str = "kbolt_local_rerank";
34const MANAGED_EXPAND_PROVIDER: &str = "kbolt_local_expand";
35
36const EMBEDDER_MODEL_LABEL: &str = "embeddinggemma";
37const RERANKER_MODEL_LABEL: &str = "qwen3-reranker";
38const EXPANDER_MODEL_LABEL: &str = "qwen3-1.7b";
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41enum ManagedRole {
42    Embedder,
43    Reranker,
44    Expander,
45}
46
47#[derive(Debug, Clone, Copy)]
48struct ManagedServiceSpec {
49    role: ManagedRole,
50    name: &'static str,
51    provider_name: &'static str,
52    model_label: &'static str,
53    model_repo: &'static str,
54    model_file: &'static str,
55    preferred_port: u16,
56}
57
58const EMBEDDER_SPEC: ManagedServiceSpec = ManagedServiceSpec {
59    role: ManagedRole::Embedder,
60    name: "embedder",
61    provider_name: MANAGED_EMBED_PROVIDER,
62    model_label: EMBEDDER_MODEL_LABEL,
63    model_repo: "ggml-org/embeddinggemma-300M-GGUF",
64    model_file: "embeddinggemma-300M-Q8_0.gguf",
65    preferred_port: 8101,
66};
67
68const RERANKER_SPEC: ManagedServiceSpec = ManagedServiceSpec {
69    role: ManagedRole::Reranker,
70    name: "reranker",
71    provider_name: MANAGED_RERANK_PROVIDER,
72    model_label: RERANKER_MODEL_LABEL,
73    model_repo: "ggml-org/Qwen3-Reranker-0.6B-Q8_0-GGUF",
74    model_file: "qwen3-reranker-0.6b-q8_0.gguf",
75    preferred_port: 8102,
76};
77
78const EXPANDER_SPEC: ManagedServiceSpec = ManagedServiceSpec {
79    role: ManagedRole::Expander,
80    name: "expander",
81    provider_name: MANAGED_EXPAND_PROVIDER,
82    model_label: EXPANDER_MODEL_LABEL,
83    model_repo: "Qwen/Qwen3-1.7B-GGUF",
84    model_file: "Qwen3-1.7B-Q8_0.gguf",
85    preferred_port: 8103,
86};
87
88static EMBEDDER_RECOVERY_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
89static RERANKER_RECOVERY_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
90static EXPANDER_RECOVERY_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
91
92pub(crate) fn is_managed_provider_name(provider_name: &str) -> bool {
93    managed_service_spec(provider_name).is_some()
94}
95
96pub(crate) fn managed_provider_label(provider_name: &str) -> Option<&'static str> {
97    managed_service_spec(provider_name).map(|spec| spec.name)
98}
99
100pub(crate) fn managed_provider_model_path(
101    cache_dir: &Path,
102    provider_name: &str,
103) -> Option<PathBuf> {
104    managed_service_spec(provider_name).map(|spec| managed_model_path(cache_dir, spec))
105}
106
107pub(crate) fn restart_managed_service(config: &Config, provider_name: &str) -> Result<()> {
108    let spec = managed_service_spec(provider_name).ok_or_else(|| {
109        KboltError::Config(format!(
110            "automatic recovery is only supported for managed local providers; got '{provider_name}'"
111        ))
112    })?;
113    let _guard = managed_service_recovery_lock(spec).lock().map_err(|_| {
114        KboltError::Internal(format!("managed {} recovery lock was poisoned", spec.name))
115    })?;
116
117    prepare_runtime_dirs(&config.cache_dir)?;
118    let llama_server_path = find_llama_server()?;
119    let port = provider_port(config, spec.provider_name)?;
120    let service = start_or_reuse_service(config, &llama_server_path, spec, port)?;
121    if service.ready {
122        return Ok(());
123    }
124
125    Err(KboltError::Inference(format!(
126        "managed {} on {} is running but not ready; run `kbolt local status`",
127        spec.name, service.endpoint
128    ))
129    .into())
130}
131
132pub fn setup_local(config_path: Option<&Path>) -> Result<LocalReport> {
133    let llama_server_path = find_llama_server()?;
134    let (mut config, mut notes) = load_setup_config(config_path)?;
135    let mut reserved_ports = HashSet::new();
136    let mut started = Vec::new();
137
138    prepare_runtime_dirs(&config.cache_dir)?;
139    let embedder = match prepare_service(
140        &config,
141        &llama_server_path,
142        &EMBEDDER_SPEC,
143        &mut reserved_ports,
144        &mut notes,
145        &mut started,
146    ) {
147        Ok(service) => service,
148        Err(err) => {
149            stop_started_children(&started);
150            return Err(err);
151        }
152    };
153    let reranker = match prepare_service(
154        &config,
155        &llama_server_path,
156        &RERANKER_SPEC,
157        &mut reserved_ports,
158        &mut notes,
159        &mut started,
160    ) {
161        Ok(service) => service,
162        Err(err) => {
163            stop_started_children(&started);
164            return Err(err);
165        }
166    };
167
168    apply_managed_service_config(&mut config, &EMBEDDER_SPEC, embedder.port);
169    apply_managed_service_config(&mut config, &RERANKER_SPEC, reranker.port);
170    if config.default_space.is_none() {
171        config.default_space = Some("default".to_string());
172    }
173
174    if let Err(err) = config::save(&config) {
175        stop_started_children(&started);
176        return Err(err);
177    }
178
179    build_report(
180        &config,
181        Some(llama_server_path),
182        LocalAction::Setup,
183        notes,
184        &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
185    )
186}
187
188fn load_setup_config(config_path: Option<&Path>) -> Result<(Config, Vec<String>)> {
189    let config_file = config::resolve_config_file_path(config_path)?;
190    match config::load(config_path) {
191        Ok(config) => Ok((config, Vec::new())),
192        Err(err @ crate::error::CoreError::Domain(KboltError::Config(_)))
193            if config_file.exists() =>
194        {
195            if !looks_like_legacy_config(&config_file)? {
196                return Err(err);
197            }
198            let backup_path = backup_invalid_config(&config_file)?;
199            let config = config::load(config_path)?;
200            Ok((
201                config,
202                vec![format!(
203                    "moved incompatible legacy config to {} and created a fresh current config",
204                    backup_path.display()
205                )],
206            ))
207        }
208        Err(err) => Err(err),
209    }
210}
211
212fn looks_like_legacy_config(config_file: &Path) -> Result<bool> {
213    let raw = fs::read_to_string(config_file)?;
214    Ok(raw
215        .lines()
216        .map(str::trim)
217        .any(|line| matches!(line, "[embeddings]" | "[models]")))
218}
219
220fn backup_invalid_config(config_file: &Path) -> Result<PathBuf> {
221    let Some(file_name) = config_file.file_name().and_then(|name| name.to_str()) else {
222        return Err(KboltError::Internal(format!(
223            "invalid config path: {}",
224            config_file.display()
225        ))
226        .into());
227    };
228    let parent = config_file.parent().ok_or_else(|| {
229        KboltError::Internal(format!(
230            "config file has no parent: {}",
231            config_file.display()
232        ))
233    })?;
234
235    for suffix in std::iter::once(".invalid.bak".to_string())
236        .chain((1usize..).map(|index| format!(".invalid.{index}.bak")))
237    {
238        let candidate = parent.join(format!("{file_name}{suffix}"));
239        if candidate.exists() {
240            continue;
241        }
242
243        fs::rename(config_file, &candidate)?;
244        return Ok(candidate);
245    }
246
247    unreachable!("backup suffix iterator is infinite");
248}
249
250pub fn enable_deep(config_path: Option<&Path>) -> Result<LocalReport> {
251    let llama_server_path = find_llama_server()?;
252    let mut config = config::load(config_path)?;
253    ensure_managed_local_base_configured(&config)?;
254
255    let mut reserved_ports = reserved_ports_from_config(&config);
256    let mut notes = Vec::new();
257    let mut started = Vec::new();
258    let expander = match prepare_service(
259        &config,
260        &llama_server_path,
261        &EXPANDER_SPEC,
262        &mut reserved_ports,
263        &mut notes,
264        &mut started,
265    ) {
266        Ok(service) => service,
267        Err(err) => {
268            stop_started_children(&started);
269            return Err(err);
270        }
271    };
272    apply_managed_service_config(&mut config, &EXPANDER_SPEC, expander.port);
273
274    if let Err(err) = config::save(&config) {
275        stop_started_children(&started);
276        return Err(err);
277    }
278
279    build_report(
280        &config,
281        Some(llama_server_path),
282        LocalAction::EnableDeep,
283        notes,
284        &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
285    )
286}
287
288pub fn start_local(config_path: Option<&Path>) -> Result<LocalReport> {
289    let llama_server_path = find_llama_server()?;
290    let config = config::load_existing(config_path)?;
291    ensure_managed_local_base_configured(&config)?;
292    prepare_runtime_dirs(&config.cache_dir)?;
293
294    let mut notes = Vec::new();
295    let specs = configured_specs(&config);
296    for spec in &specs {
297        let port = provider_port(&config, spec.provider_name)?;
298        let service = start_or_reuse_service(&config, &llama_server_path, spec, port)?;
299        if let Some(pid) = service.started_pid {
300            let _ = pid;
301            notes.push(format!("started {} on {}", spec.name, service.endpoint));
302        } else if service.ready {
303            notes.push(format!(
304                "{} already ready on {}",
305                spec.name, service.endpoint
306            ));
307        } else if service.running {
308            notes.push(format!(
309                "{} is already running on {} but is not managed by kbolt",
310                spec.name, service.endpoint
311            ));
312        }
313    }
314
315    build_report(
316        &config,
317        Some(llama_server_path),
318        LocalAction::Start,
319        notes,
320        &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
321    )
322}
323
324pub fn stop_local(config_path: Option<&Path>) -> Result<LocalReport> {
325    let config = config::load_existing(config_path)?;
326    let mut notes = Vec::new();
327
328    for spec in configured_specs(&config) {
329        let pid_file = pid_file_path(&config.cache_dir, spec);
330        let Some(pid) = read_pid(&pid_file)? else {
331            notes.push(format!("{} is not managed by a kbolt pid file", spec.name));
332            continue;
333        };
334
335        if !pid_is_alive(pid) {
336            remove_pid_file(&pid_file)?;
337            notes.push(format!("removed stale pid file for {}", spec.name));
338            continue;
339        }
340
341        terminate_pid(pid)?;
342        remove_pid_file(&pid_file)?;
343        notes.push(format!("stopped {}", spec.name));
344    }
345
346    build_report(
347        &config,
348        find_llama_server_optional(),
349        LocalAction::Stop,
350        notes,
351        &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
352    )
353}
354
355pub fn local_status(config_path: Option<&Path>) -> Result<LocalReport> {
356    let config_file = config::resolve_config_file_path(config_path)?;
357    let cache_dir = default_cache_dir()?;
358    if !config_file.exists() {
359        return Ok(LocalReport {
360            action: LocalAction::Status,
361            config_file,
362            cache_dir: cache_dir.clone(),
363            llama_server_path: find_llama_server_optional(),
364            ready: false,
365            notes: vec!["kbolt is not set up yet; run `kbolt setup local`.".to_string()],
366            services: vec![
367                missing_service_report(&cache_dir, &EMBEDDER_SPEC),
368                missing_service_report(&cache_dir, &RERANKER_SPEC),
369                missing_service_report(&cache_dir, &EXPANDER_SPEC),
370            ],
371        });
372    }
373
374    let config = config::load_existing(config_path)?;
375    build_report(
376        &config,
377        find_llama_server_optional(),
378        LocalAction::Status,
379        Vec::new(),
380        &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
381    )
382}
383
384#[derive(Debug)]
385struct PreparedService {
386    port: u16,
387}
388
389#[derive(Debug)]
390struct RunningService {
391    endpoint: String,
392    running: bool,
393    ready: bool,
394    started_pid: Option<u32>,
395}
396
397fn prepare_service(
398    config: &Config,
399    llama_server_path: &Path,
400    spec: &ManagedServiceSpec,
401    reserved_ports: &mut HashSet<u16>,
402    notes: &mut Vec<String>,
403    started: &mut Vec<u32>,
404) -> Result<PreparedService> {
405    ensure_model_file(&config.cache_dir, spec)?;
406    let port = select_port(config, spec, reserved_ports)?;
407    reserved_ports.insert(port);
408    let service = start_or_reuse_service(config, llama_server_path, spec, port)?;
409    if let Some(pid) = service.started_pid {
410        started.push(pid);
411        notes.push(started_service_note(spec.name, &service.endpoint));
412    } else if port != spec.preferred_port {
413        notes.push(format!(
414            "{} default port {} was unavailable; using {}",
415            spec.name, spec.preferred_port, port
416        ));
417    }
418    Ok(PreparedService { port })
419}
420
421fn started_service_note(name: &str, endpoint: &str) -> String {
422    format!("started {name} on {endpoint}")
423}
424
425fn start_or_reuse_service(
426    config: &Config,
427    llama_server_path: &Path,
428    spec: &ManagedServiceSpec,
429    port: u16,
430) -> Result<RunningService> {
431    let endpoint = endpoint_for_port(port);
432    let pid_file = pid_file_path(&config.cache_dir, spec);
433    let model_path = ensure_model_file(&config.cache_dir, spec)?;
434
435    if let Some(pid) = read_pid(&pid_file)? {
436        if pid_is_alive(pid) {
437            if probe_service(config, spec, port).is_ok() {
438                return Ok(RunningService {
439                    endpoint,
440                    running: true,
441                    ready: true,
442                    started_pid: None,
443                });
444            }
445        } else {
446            remove_pid_file(&pid_file)?;
447        }
448    }
449
450    if is_port_bound(port) {
451        let ready = probe_service(config, spec, port).is_ok();
452        return Ok(RunningService {
453            endpoint,
454            running: true,
455            ready,
456            started_pid: None,
457        });
458    }
459
460    let log_file = log_file_path(&config.cache_dir, spec);
461    let child = spawn_llama_server(llama_server_path, spec, &model_path, port, &log_file)?;
462    write_pid(&pid_file, child.id())?;
463    let pid = child.id();
464    drop(child);
465
466    let start = Instant::now();
467    loop {
468        if probe_service(config, spec, port).is_ok() {
469            return Ok(RunningService {
470                endpoint,
471                running: true,
472                ready: true,
473                started_pid: Some(pid),
474            });
475        }
476
477        if start.elapsed() >= MODEL_DOWNLOAD_TIMEOUT {
478            let _ = terminate_pid(pid);
479            let _ = remove_pid_file(&pid_file);
480            return Err(KboltError::Inference(format!(
481                "{} did not become ready on {} within {}s; check {}",
482                spec.name,
483                endpoint,
484                MODEL_DOWNLOAD_TIMEOUT.as_secs(),
485                log_file.display()
486            ))
487            .into());
488        }
489
490        thread::sleep(Duration::from_millis(250));
491    }
492}
493
494fn build_report(
495    config: &Config,
496    llama_server_path: Option<PathBuf>,
497    action: LocalAction,
498    notes: Vec<String>,
499    specs: &[ManagedServiceSpec],
500) -> Result<LocalReport> {
501    let mut services = Vec::new();
502    let mut ready = true;
503
504    for spec in specs {
505        let provider = config.providers.get(spec.provider_name);
506        let configured = role_uses_provider(config, spec.provider_name);
507        let enabled = provider.is_some();
508        let model_path = managed_model_path(&config.cache_dir, spec);
509        let port = provider
510            .map(|profile| provider_profile_port(profile))
511            .transpose()?
512            .unwrap_or(spec.preferred_port);
513        let endpoint = endpoint_for_port(port);
514        let pid_file = pid_file_path(&config.cache_dir, spec);
515        let log_file = log_file_path(&config.cache_dir, spec);
516        let pid = read_pid(&pid_file)?;
517        let managed = pid.map(pid_is_alive).unwrap_or(false);
518        let running = is_port_bound(port);
519        let probe = if enabled {
520            probe_service(config, spec, port).err()
521        } else {
522            None
523        };
524        let service_ready = enabled && probe.is_none();
525        if enabled && !service_ready {
526            ready = false;
527        }
528
529        services.push(LocalServiceReport {
530            name: spec.name.to_string(),
531            provider: spec.provider_name.to_string(),
532            enabled,
533            configured,
534            managed,
535            running,
536            ready: service_ready,
537            model: spec.model_label.to_string(),
538            model_path,
539            endpoint,
540            port,
541            pid: pid.filter(|value| pid_is_alive(*value)),
542            pid_file,
543            log_file,
544            issue: if !enabled {
545                Some("service is not configured".to_string())
546            } else if service_ready {
547                if running && !managed {
548                    Some("service is reachable but not managed by kbolt".to_string())
549                } else {
550                    None
551                }
552            } else {
553                Some(
554                    probe
555                        .map(|err| err.to_string())
556                        .unwrap_or_else(|| "service is not ready".to_string()),
557                )
558            },
559        });
560    }
561
562    Ok(LocalReport {
563        action,
564        config_file: config.config_dir.join("index.toml"),
565        cache_dir: config.cache_dir.clone(),
566        llama_server_path,
567        ready,
568        notes,
569        services,
570    })
571}
572
573fn missing_service_report(cache_dir: &Path, spec: &ManagedServiceSpec) -> LocalServiceReport {
574    LocalServiceReport {
575        name: spec.name.to_string(),
576        provider: spec.provider_name.to_string(),
577        enabled: false,
578        configured: false,
579        managed: false,
580        running: false,
581        ready: false,
582        model: spec.model_label.to_string(),
583        model_path: managed_model_path(cache_dir, spec),
584        endpoint: endpoint_for_port(spec.preferred_port),
585        port: spec.preferred_port,
586        pid: None,
587        pid_file: pid_file_path(cache_dir, spec),
588        log_file: log_file_path(cache_dir, spec),
589        issue: Some("service is not configured".to_string()),
590    }
591}
592
593fn prepare_runtime_dirs(cache_dir: &Path) -> Result<()> {
594    fs::create_dir_all(cache_dir.join("models"))?;
595    fs::create_dir_all(cache_dir.join("run"))?;
596    fs::create_dir_all(cache_dir.join("logs"))?;
597    Ok(())
598}
599
600fn ensure_managed_local_base_configured(config: &Config) -> Result<()> {
601    if !config.providers.contains_key(MANAGED_EMBED_PROVIDER)
602        || !config.providers.contains_key(MANAGED_RERANK_PROVIDER)
603    {
604        return Err(KboltError::Config(
605            "managed local setup is not configured; run `kbolt setup local` first".to_string(),
606        )
607        .into());
608    }
609    Ok(())
610}
611
612fn managed_service_spec(provider_name: &str) -> Option<&'static ManagedServiceSpec> {
613    match provider_name {
614        MANAGED_EMBED_PROVIDER => Some(&EMBEDDER_SPEC),
615        MANAGED_RERANK_PROVIDER => Some(&RERANKER_SPEC),
616        MANAGED_EXPAND_PROVIDER => Some(&EXPANDER_SPEC),
617        _ => None,
618    }
619}
620
621fn managed_service_recovery_lock(spec: &ManagedServiceSpec) -> &'static Mutex<()> {
622    match spec.role {
623        ManagedRole::Embedder => EMBEDDER_RECOVERY_LOCK.get_or_init(|| Mutex::new(())),
624        ManagedRole::Reranker => RERANKER_RECOVERY_LOCK.get_or_init(|| Mutex::new(())),
625        ManagedRole::Expander => EXPANDER_RECOVERY_LOCK.get_or_init(|| Mutex::new(())),
626    }
627}
628
629fn configured_specs(config: &Config) -> Vec<&'static ManagedServiceSpec> {
630    let mut specs = Vec::new();
631    if config.providers.contains_key(MANAGED_EMBED_PROVIDER) {
632        specs.push(&EMBEDDER_SPEC);
633    }
634    if config.providers.contains_key(MANAGED_RERANK_PROVIDER) {
635        specs.push(&RERANKER_SPEC);
636    }
637    if config.providers.contains_key(MANAGED_EXPAND_PROVIDER) {
638        specs.push(&EXPANDER_SPEC);
639    }
640    specs
641}
642
643fn role_uses_provider(config: &Config, provider_name: &str) -> bool {
644    config
645        .roles
646        .embedder
647        .as_ref()
648        .map(|role| role.provider == provider_name)
649        .unwrap_or(false)
650        || config
651            .roles
652            .reranker
653            .as_ref()
654            .map(|role| role.provider == provider_name)
655            .unwrap_or(false)
656        || config
657            .roles
658            .expander
659            .as_ref()
660            .map(|role| role.provider == provider_name)
661            .unwrap_or(false)
662}
663
664fn apply_managed_service_config(config: &mut Config, spec: &ManagedServiceSpec, port: u16) {
665    config.providers.insert(
666        spec.provider_name.to_string(),
667        ProviderProfileConfig::LlamaCppServer {
668            operation: match spec.role {
669                ManagedRole::Embedder => ProviderOperation::Embedding,
670                ManagedRole::Reranker => ProviderOperation::Reranking,
671                ManagedRole::Expander => ProviderOperation::ChatCompletion,
672            },
673            base_url: endpoint_for_port(port),
674            model: spec.model_label.to_string(),
675            parallel_requests: match spec.role {
676                ManagedRole::Embedder => Some(MANAGED_EMBEDDER_PARALLEL_REQUESTS),
677                ManagedRole::Reranker => Some(MANAGED_RERANKER_PARALLEL_REQUESTS),
678                ManagedRole::Expander => None,
679            },
680            timeout_ms: 30_000,
681            max_retries: 2,
682        },
683    );
684
685    match spec.role {
686        ManagedRole::Embedder => {
687            config.roles.embedder = Some(EmbedderRoleConfig {
688                provider: spec.provider_name.to_string(),
689                batch_size: 32,
690            });
691        }
692        ManagedRole::Reranker => {
693            config.roles.reranker = Some(RerankerRoleConfig {
694                provider: spec.provider_name.to_string(),
695            });
696        }
697        ManagedRole::Expander => {
698            config.roles.expander = Some(ExpanderRoleConfig {
699                provider: spec.provider_name.to_string(),
700                max_tokens: 600,
701                sampling: crate::config::ExpanderSamplingConfig::default(),
702            });
703        }
704    }
705}
706
707fn ensure_model_file(cache_dir: &Path, spec: &ManagedServiceSpec) -> Result<PathBuf> {
708    let model_path = managed_model_path(cache_dir, spec);
709    if model_path.is_file() {
710        return Ok(model_path);
711    }
712
713    let download_dir = cache_dir.join("models").join(spec.name);
714    fs::create_dir_all(&download_dir)?;
715
716    let api = ApiBuilder::new()
717        .with_cache_dir(download_dir.clone())
718        .build()
719        .map_err(|err| KboltError::ModelDownload(format!("{}: {err}", spec.model_repo)))?;
720    let repo = api.repo(Repo::new(spec.model_repo.to_string(), RepoType::Model));
721    let downloaded_path = repo
722        .get(spec.model_file)
723        .map_err(|err| KboltError::ModelDownload(format!("{}: {err}", spec.model_repo)))?;
724
725    if downloaded_path != model_path {
726        fs::copy(&downloaded_path, &model_path)?;
727    }
728
729    Ok(model_path)
730}
731
732fn managed_model_path(cache_dir: &Path, spec: &ManagedServiceSpec) -> PathBuf {
733    cache_dir
734        .join("models")
735        .join(spec.name)
736        .join(spec.model_file)
737}
738
739fn pid_file_path(cache_dir: &Path, spec: &ManagedServiceSpec) -> PathBuf {
740    cache_dir.join("run").join(format!("{}.pid", spec.name))
741}
742
743fn log_file_path(cache_dir: &Path, spec: &ManagedServiceSpec) -> PathBuf {
744    cache_dir.join("logs").join(format!("{}.log", spec.name))
745}
746
747fn endpoint_for_port(port: u16) -> String {
748    format!("http://{LOCALHOST}:{port}")
749}
750
751fn default_cache_dir() -> Result<PathBuf> {
752    let base = dirs::cache_dir()
753        .ok_or_else(|| KboltError::Config("unable to determine user cache directory".into()))?;
754    Ok(base.join(APP_NAME))
755}
756
757fn select_port(
758    config: &Config,
759    spec: &ManagedServiceSpec,
760    reserved_ports: &HashSet<u16>,
761) -> Result<u16> {
762    let preferred = config
763        .providers
764        .get(spec.provider_name)
765        .map(provider_profile_port)
766        .transpose()?
767        .unwrap_or(spec.preferred_port);
768
769    if port_candidate_usable(config, spec, preferred, reserved_ports)? {
770        return Ok(preferred);
771    }
772
773    for port in preferred..(preferred + 20) {
774        if port_candidate_usable(config, spec, port, reserved_ports)? {
775            return Ok(port);
776        }
777    }
778
779    Err(KboltError::Config(format!(
780        "no free local port found for {} near {}",
781        spec.name, preferred
782    ))
783    .into())
784}
785
786fn port_candidate_usable(
787    config: &Config,
788    spec: &ManagedServiceSpec,
789    port: u16,
790    reserved_ports: &HashSet<u16>,
791) -> Result<bool> {
792    if reserved_ports.contains(&port) {
793        return Ok(false);
794    }
795
796    let pid_file = pid_file_path(&config.cache_dir, spec);
797    if let Some(pid) = read_pid(&pid_file)? {
798        if pid_is_alive(pid) && provider_port_matches(config, spec.provider_name, port)? {
799            return Ok(true);
800        }
801    }
802
803    Ok(!is_port_bound(port))
804}
805
806fn provider_port_matches(config: &Config, provider_name: &str, port: u16) -> Result<bool> {
807    let Some(profile) = config.providers.get(provider_name) else {
808        return Ok(false);
809    };
810    Ok(provider_profile_port(profile)? == port)
811}
812
813fn reserved_ports_from_config(config: &Config) -> HashSet<u16> {
814    let mut ports = HashSet::new();
815    for provider_name in [
816        MANAGED_EMBED_PROVIDER,
817        MANAGED_RERANK_PROVIDER,
818        MANAGED_EXPAND_PROVIDER,
819    ] {
820        if let Some(profile) = config.providers.get(provider_name) {
821            if let Ok(port) = provider_profile_port(profile) {
822                ports.insert(port);
823            }
824        }
825    }
826    ports
827}
828
829fn provider_port(config: &Config, provider_name: &str) -> Result<u16> {
830    let profile = config.providers.get(provider_name).ok_or_else(|| {
831        KboltError::Config(format!("missing managed provider config: {provider_name}"))
832    })?;
833    provider_profile_port(profile)
834}
835
836fn provider_profile_port(profile: &ProviderProfileConfig) -> Result<u16> {
837    let base_url = profile.base_url();
838    let port = base_url
839        .rsplit(':')
840        .next()
841        .and_then(|raw| raw.parse::<u16>().ok())
842        .ok_or_else(|| KboltError::Config(format!("invalid managed local base_url: {base_url}")))?;
843    Ok(port)
844}
845
846fn open_managed_service_log(log_file: &Path) -> Result<File> {
847    if let Some(parent) = log_file.parent() {
848        fs::create_dir_all(parent)?;
849    }
850    File::options()
851        .create(true)
852        .write(true)
853        .truncate(true)
854        .open(log_file)
855        .map_err(Into::into)
856}
857
858fn configure_llama_server_command(
859    command: &mut Command,
860    llama_server_path: &Path,
861    spec: &ManagedServiceSpec,
862    model_path: &Path,
863    port: u16,
864) {
865    command
866        .arg(llama_server_path)
867        .arg("-m")
868        .arg(model_path)
869        .arg("--port")
870        .arg(port.to_string())
871        .arg("--log-verbosity")
872        .arg(LLAMA_SERVER_LOG_VERBOSITY);
873
874    match spec.role {
875        ManagedRole::Embedder => {
876            command
877                .arg("--embedding")
878                .arg("--pooling")
879                .arg("mean")
880                .arg("-ngl")
881                .arg("99")
882                .arg("-np")
883                .arg(MANAGED_EMBEDDER_PARALLEL_REQUESTS.to_string())
884                .arg("-c")
885                .arg(
886                    (MANAGED_PARALLEL_CONTEXT_TOKENS * MANAGED_EMBEDDER_PARALLEL_REQUESTS)
887                        .to_string(),
888                )
889                .arg("-ub")
890                .arg(MANAGED_PARALLEL_CONTEXT_TOKENS.to_string());
891        }
892        ManagedRole::Reranker => {
893            command
894                .arg("--reranking")
895                .arg("--pooling")
896                .arg("rank")
897                .arg("-ngl")
898                .arg("99")
899                .arg("-np")
900                .arg(MANAGED_RERANKER_PARALLEL_REQUESTS.to_string())
901                .arg("-c")
902                .arg(
903                    (MANAGED_PARALLEL_CONTEXT_TOKENS * MANAGED_RERANKER_PARALLEL_REQUESTS)
904                        .to_string(),
905                )
906                .arg("-ub")
907                .arg(MANAGED_PARALLEL_CONTEXT_TOKENS.to_string());
908        }
909        ManagedRole::Expander => {
910            command.arg("-ngl").arg("99").arg("-c").arg("2048");
911        }
912    }
913}
914
915fn spawn_llama_server(
916    llama_server_path: &Path,
917    spec: &ManagedServiceSpec,
918    model_path: &Path,
919    port: u16,
920    log_file: &Path,
921) -> Result<std::process::Child> {
922    let stdout = open_managed_service_log(log_file)?;
923    let stderr = stdout.try_clone()?;
924
925    let mut command = if Path::new("/usr/bin/nohup").is_file() {
926        Command::new("/usr/bin/nohup")
927    } else {
928        Command::new("nohup")
929    };
930    configure_llama_server_command(&mut command, llama_server_path, spec, model_path, port);
931
932    command
933        .stdin(Stdio::null())
934        .stdout(Stdio::from(stdout))
935        .stderr(Stdio::from(stderr))
936        .spawn()
937        .map_err(|err| {
938            KboltError::Inference(format!(
939                "failed to start {} via {}: {err}",
940                spec.name,
941                llama_server_path.display()
942            ))
943        })
944        .map_err(Into::into)
945}
946
947fn probe_service(config: &Config, spec: &ManagedServiceSpec, port: u16) -> Result<()> {
948    let mut probe_config = config.clone();
949    probe_config.providers = HashMap::new();
950    probe_config.roles.embedder = None;
951    probe_config.roles.reranker = None;
952    probe_config.roles.expander = None;
953    apply_managed_service_config(&mut probe_config, spec, port);
954
955    let status = models::status(&probe_config)?;
956    let info = match spec.role {
957        ManagedRole::Embedder => &status.embedder,
958        ManagedRole::Reranker => &status.reranker,
959        ManagedRole::Expander => &status.expander,
960    };
961    if !info.ready {
962        return Err(KboltError::Inference(
963            info.issue
964                .clone()
965                .unwrap_or_else(|| format!("{} is not ready", spec.name)),
966        )
967        .into());
968    }
969
970    let clients = build_inference_clients_without_managed_recovery(&probe_config)?;
971    match spec.role {
972        ManagedRole::Embedder => {
973            let embedder = clients.embedder.as_ref().ok_or_else(|| {
974                KboltError::Inference("managed embedder client was not built".to_string())
975            })?;
976            let vectors = embedder.embed_batch(
977                EmbeddingInputKind::Query,
978                &["kbolt local probe".to_string()],
979            )?;
980            if vectors.len() != 1 || vectors[0].is_empty() {
981                return Err(KboltError::Inference(
982                    "managed embedder smoke returned an invalid embedding".to_string(),
983                )
984                .into());
985            }
986            let sizer = clients.embedding_document_sizer.as_ref().ok_or_else(|| {
987                KboltError::Inference("managed embedder tokenizer client was not built".to_string())
988            })?;
989            let tokens = sizer.count_document_tokens("kbolt local probe")?;
990            if tokens == 0 {
991                return Err(KboltError::Inference(
992                    "managed embedder tokenize smoke returned zero tokens".to_string(),
993                )
994                .into());
995            }
996        }
997        ManagedRole::Reranker => {
998            let reranker = clients.reranker.as_ref().ok_or_else(|| {
999                KboltError::Inference("managed reranker client was not built".to_string())
1000            })?;
1001            let scores = reranker.rerank(
1002                "kbolt local probe",
1003                &["kbolt local rerank document".to_string()],
1004            )?;
1005            if scores.len() != 1 || !scores[0].is_finite() {
1006                return Err(KboltError::Inference(
1007                    "managed reranker smoke returned an invalid score".to_string(),
1008                )
1009                .into());
1010            }
1011        }
1012        ManagedRole::Expander => {
1013            let expander = clients.expander.as_ref().ok_or_else(|| {
1014                KboltError::Inference("managed expander client was not built".to_string())
1015            })?;
1016            let variants = expander.expand("kbolt local probe", 2)?;
1017            if variants.is_empty() {
1018                return Err(KboltError::Inference(
1019                    "managed expander smoke returned no variants".to_string(),
1020                )
1021                .into());
1022            }
1023        }
1024    }
1025
1026    Ok(())
1027}
1028
1029fn find_llama_server() -> Result<PathBuf> {
1030    find_llama_server_optional().ok_or_else(|| {
1031        KboltError::Config(format!(
1032            "llama-server was not found on PATH. Install llama.cpp and rerun setup. macOS hint: `{LLAMA_SERVER_BREW_HINT}`"
1033        ))
1034        .into()
1035    })
1036}
1037
1038fn find_llama_server_optional() -> Option<PathBuf> {
1039    let mut candidates = Vec::new();
1040    if let Some(path_var) = std::env::var_os("PATH") {
1041        for dir in std::env::split_paths(&path_var) {
1042            candidates.push(dir.join("llama-server"));
1043        }
1044    }
1045    candidates.push(PathBuf::from("/opt/homebrew/bin/llama-server"));
1046    candidates.push(PathBuf::from("/usr/local/bin/llama-server"));
1047
1048    candidates.into_iter().find(|path| path.is_file())
1049}
1050
1051fn write_pid(path: &Path, pid: u32) -> Result<()> {
1052    if let Some(parent) = path.parent() {
1053        fs::create_dir_all(parent)?;
1054    }
1055    fs::write(path, format!("{pid}\n"))?;
1056    Ok(())
1057}
1058
1059fn read_pid(path: &Path) -> Result<Option<u32>> {
1060    if !path.exists() {
1061        return Ok(None);
1062    }
1063    let raw = fs::read_to_string(path)?;
1064    let pid = raw.trim().parse::<u32>().map_err(|err| {
1065        KboltError::Internal(format!("invalid pid file {}: {err}", path.display()))
1066    })?;
1067    Ok(Some(pid))
1068}
1069
1070fn remove_pid_file(path: &Path) -> Result<()> {
1071    match fs::remove_file(path) {
1072        Ok(()) => Ok(()),
1073        Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
1074        Err(err) => Err(err.into()),
1075    }
1076}
1077
1078fn pid_is_alive(pid: u32) -> bool {
1079    Command::new("kill")
1080        .arg("-0")
1081        .arg(pid.to_string())
1082        .stdin(Stdio::null())
1083        .stdout(Stdio::null())
1084        .stderr(Stdio::null())
1085        .status()
1086        .map(|status| status.success())
1087        .unwrap_or(false)
1088}
1089
1090fn terminate_pid(pid: u32) -> Result<()> {
1091    let term = Command::new("kill")
1092        .arg(pid.to_string())
1093        .stdin(Stdio::null())
1094        .stdout(Stdio::null())
1095        .stderr(Stdio::null())
1096        .status()
1097        .map_err(|err| KboltError::Internal(format!("failed to send SIGTERM to {pid}: {err}")))?;
1098    if !term.success() && pid_is_alive(pid) {
1099        return Err(KboltError::Internal(format!("failed to stop process {pid}")).into());
1100    }
1101
1102    let start = Instant::now();
1103    while pid_is_alive(pid) && start.elapsed() < STOP_WAIT_TIMEOUT {
1104        thread::sleep(Duration::from_millis(100));
1105    }
1106
1107    if pid_is_alive(pid) {
1108        let kill = Command::new("kill")
1109            .arg("-9")
1110            .arg(pid.to_string())
1111            .stdin(Stdio::null())
1112            .stdout(Stdio::null())
1113            .stderr(Stdio::null())
1114            .status()
1115            .map_err(|err| {
1116                KboltError::Internal(format!("failed to send SIGKILL to {pid}: {err}"))
1117            })?;
1118        if !kill.success() && pid_is_alive(pid) {
1119            return Err(KboltError::Internal(format!("failed to kill process {pid}")).into());
1120        }
1121    }
1122
1123    Ok(())
1124}
1125
1126fn is_port_bound(port: u16) -> bool {
1127    TcpListener::bind((LOCALHOST, port)).is_err()
1128}
1129
1130fn stop_started_children(started: &[u32]) {
1131    for pid in started {
1132        let _ = terminate_pid(*pid);
1133    }
1134}
1135
1136#[cfg(test)]
1137mod tests {
1138    use std::fs;
1139    use std::io::Write;
1140    use std::net::TcpListener;
1141    use std::path::Path;
1142    use std::process::Command;
1143
1144    use tempfile::tempdir;
1145
1146    use super::{
1147        apply_managed_service_config, configure_llama_server_command, endpoint_for_port,
1148        load_setup_config, managed_model_path, managed_provider_model_path, missing_service_report,
1149        open_managed_service_log, select_port, started_service_note, ManagedRole, EMBEDDER_SPEC,
1150        EXPANDER_SPEC, LLAMA_SERVER_LOG_VERBOSITY, MANAGED_EMBEDDER_PARALLEL_REQUESTS,
1151        MANAGED_PARALLEL_CONTEXT_TOKENS, MANAGED_RERANKER_PARALLEL_REQUESTS, RERANKER_SPEC,
1152    };
1153    use crate::config::{self, Config, ProviderProfileConfig};
1154
1155    fn temp_config() -> Config {
1156        let tmp = tempdir().expect("tempdir");
1157        let root = tmp.keep();
1158        let config_dir = root.join("config");
1159        fs::create_dir_all(&config_dir).expect("config dir");
1160        let mut config = config::load(Some(&config_dir)).expect("load config");
1161        config.cache_dir = root.join("cache");
1162        fs::create_dir_all(&config.cache_dir).expect("cache dir");
1163        config
1164    }
1165
1166    #[test]
1167    fn setup_config_binds_managed_local_roles() {
1168        let mut config = temp_config();
1169        apply_managed_service_config(&mut config, &EMBEDDER_SPEC, 8101);
1170        apply_managed_service_config(&mut config, &RERANKER_SPEC, 8102);
1171
1172        assert_eq!(
1173            config
1174                .roles
1175                .embedder
1176                .as_ref()
1177                .map(|role| role.provider.as_str()),
1178            Some("kbolt_local_embed")
1179        );
1180        assert_eq!(
1181            config
1182                .roles
1183                .reranker
1184                .as_ref()
1185                .map(|role| role.provider.as_str()),
1186            Some("kbolt_local_rerank")
1187        );
1188        assert!(config.providers.contains_key("kbolt_local_embed"));
1189        assert!(config.providers.contains_key("kbolt_local_rerank"));
1190        assert_eq!(
1191            config
1192                .providers
1193                .get("kbolt_local_embed")
1194                .and_then(ProviderProfileConfig::parallel_requests),
1195            Some(MANAGED_EMBEDDER_PARALLEL_REQUESTS)
1196        );
1197        assert_eq!(
1198            config
1199                .providers
1200                .get("kbolt_local_rerank")
1201                .and_then(ProviderProfileConfig::parallel_requests),
1202            Some(MANAGED_RERANKER_PARALLEL_REQUESTS)
1203        );
1204    }
1205
1206    #[test]
1207    fn load_setup_config_moves_incompatible_index_toml_aside() {
1208        let tmp = tempdir().expect("tempdir");
1209        let config_dir = tmp.path().join("config");
1210        fs::create_dir_all(&config_dir).expect("config dir");
1211        let config_file = config_dir.join("index.toml");
1212        fs::write(
1213            &config_file,
1214            r#"
1215[embeddings]
1216provider = "legacy"
1217"#,
1218        )
1219        .expect("write invalid config");
1220
1221        let (config, notes) =
1222            load_setup_config(Some(&config_dir)).expect("setup config should recover");
1223
1224        let backup_path = config_dir.join("index.toml.invalid.bak");
1225        assert!(backup_path.is_file(), "backup should exist");
1226        assert!(
1227            notes
1228                .iter()
1229                .any(|note| note.contains("moved incompatible legacy config")),
1230            "expected recovery note, got {notes:?}"
1231        );
1232        assert!(
1233            config.providers.is_empty(),
1234            "fresh config should be created"
1235        );
1236        assert!(config.roles.embedder.is_none());
1237        assert!(config.roles.reranker.is_none());
1238    }
1239
1240    #[test]
1241    fn load_setup_config_keeps_non_legacy_invalid_config_as_error() {
1242        let tmp = tempdir().expect("tempdir");
1243        let config_dir = tmp.path().join("config");
1244        fs::create_dir_all(&config_dir).expect("config dir");
1245        let config_file = config_dir.join("index.toml");
1246        fs::write(
1247            &config_file,
1248            r#"
1249[providers.bad]
1250operation = "embedding"
1251"#,
1252        )
1253        .expect("write invalid config");
1254
1255        let err = load_setup_config(Some(&config_dir)).expect_err("invalid config should fail");
1256        assert!(
1257            err.to_string().contains("invalid config file"),
1258            "unexpected error: {err}"
1259        );
1260        assert!(config_file.is_file(), "invalid file should remain in place");
1261        assert!(
1262            !config_dir.join("index.toml.invalid.bak").exists(),
1263            "non-legacy config should not be backed up automatically"
1264        );
1265    }
1266
1267    #[test]
1268    fn enable_deep_binds_managed_expander_role() {
1269        let mut config = temp_config();
1270        apply_managed_service_config(&mut config, &EXPANDER_SPEC, 8103);
1271
1272        assert_eq!(
1273            config
1274                .roles
1275                .expander
1276                .as_ref()
1277                .map(|role| role.provider.as_str()),
1278            Some("kbolt_local_expand")
1279        );
1280        assert!(config.providers.contains_key("kbolt_local_expand"));
1281    }
1282
1283    #[test]
1284    fn select_port_skips_bound_ports() {
1285        let mut config = temp_config();
1286        let listener = TcpListener::bind(("127.0.0.1", 0)).expect("bind test port");
1287        let occupied_port = listener.local_addr().expect("local addr").port();
1288        apply_managed_service_config(&mut config, &EMBEDDER_SPEC, occupied_port);
1289
1290        let port = select_port(&config, &EMBEDDER_SPEC, &Default::default()).expect("select port");
1291        assert_ne!(port, occupied_port);
1292    }
1293
1294    #[test]
1295    fn missing_service_report_marks_unconfigured() {
1296        let cache = tempdir().expect("tempdir");
1297        let report = missing_service_report(cache.path(), &EMBEDDER_SPEC);
1298        assert!(!report.enabled);
1299        assert!(!report.ready);
1300        assert_eq!(report.endpoint, endpoint_for_port(8101));
1301    }
1302
1303    #[test]
1304    fn managed_model_paths_are_stable() {
1305        let cache = tempdir().expect("tempdir");
1306        assert_eq!(
1307            managed_model_path(cache.path(), &EMBEDDER_SPEC),
1308            cache
1309                .path()
1310                .join("models")
1311                .join("embedder")
1312                .join("embeddinggemma-300M-Q8_0.gguf")
1313        );
1314        assert_eq!(
1315            managed_model_path(cache.path(), &RERANKER_SPEC),
1316            cache
1317                .path()
1318                .join("models")
1319                .join("reranker")
1320                .join("qwen3-reranker-0.6b-q8_0.gguf")
1321        );
1322        assert_eq!(
1323            managed_model_path(cache.path(), &EXPANDER_SPEC),
1324            cache
1325                .path()
1326                .join("models")
1327                .join("expander")
1328                .join("Qwen3-1.7B-Q8_0.gguf")
1329        );
1330        assert_eq!(
1331            managed_provider_model_path(cache.path(), EMBEDDER_SPEC.provider_name),
1332            Some(managed_model_path(cache.path(), &EMBEDDER_SPEC))
1333        );
1334        assert_eq!(
1335            managed_provider_model_path(cache.path(), "custom_embedder"),
1336            None
1337        );
1338    }
1339
1340    #[test]
1341    fn managed_service_log_is_truncated_on_spawn_open() {
1342        let tmp = tempdir().expect("tempdir");
1343        let log_file = tmp.path().join("logs").join("embedder.log");
1344        fs::create_dir_all(log_file.parent().expect("log parent")).expect("create log dir");
1345        fs::write(&log_file, "old output\n").expect("write old log");
1346
1347        {
1348            let mut log = open_managed_service_log(&log_file).expect("open managed log");
1349            log.write_all(b"new output\n").expect("write new log");
1350        }
1351
1352        let content = fs::read_to_string(&log_file).expect("read log");
1353        assert_eq!(content, "new output\n");
1354    }
1355
1356    #[test]
1357    fn llama_server_command_limits_managed_log_verbosity() {
1358        for spec in [&EMBEDDER_SPEC, &RERANKER_SPEC, &EXPANDER_SPEC] {
1359            let mut command = Command::new("nohup");
1360            configure_llama_server_command(
1361                &mut command,
1362                Path::new("/usr/local/bin/llama-server"),
1363                spec,
1364                Path::new("/tmp/model.gguf"),
1365                spec.preferred_port,
1366            );
1367            let args = command
1368                .get_args()
1369                .map(|arg| arg.to_string_lossy().into_owned())
1370                .collect::<Vec<_>>();
1371
1372            assert!(
1373                args.windows(2).any(|pair| {
1374                    pair[0] == "--log-verbosity" && pair[1] == LLAMA_SERVER_LOG_VERBOSITY
1375                }),
1376                "expected log verbosity in args for {}: {args:?}",
1377                spec.name
1378            );
1379            if spec.role == ManagedRole::Embedder {
1380                let context_tokens = (MANAGED_PARALLEL_CONTEXT_TOKENS
1381                    * MANAGED_EMBEDDER_PARALLEL_REQUESTS)
1382                    .to_string();
1383                let ubatch_tokens = MANAGED_PARALLEL_CONTEXT_TOKENS.to_string();
1384                assert!(
1385                    args.windows(2).any(|pair| {
1386                        pair[0] == "-np"
1387                            && pair[1] == MANAGED_EMBEDDER_PARALLEL_REQUESTS.to_string()
1388                    }),
1389                    "expected managed embedder parallelism in args: {args:?}"
1390                );
1391                assert!(
1392                    args.windows(2)
1393                        .any(|pair| pair[0] == "-c" && pair[1] == context_tokens),
1394                    "expected managed embedder context in args: {args:?}"
1395                );
1396                assert!(
1397                    args.windows(2)
1398                        .any(|pair| pair[0] == "-ub" && pair[1] == ubatch_tokens),
1399                    "expected managed embedder ubatch in args: {args:?}"
1400                );
1401            }
1402            if spec.role == ManagedRole::Reranker {
1403                let context_tokens = (MANAGED_PARALLEL_CONTEXT_TOKENS
1404                    * MANAGED_RERANKER_PARALLEL_REQUESTS)
1405                    .to_string();
1406                let ubatch_tokens = MANAGED_PARALLEL_CONTEXT_TOKENS.to_string();
1407                assert!(
1408                    args.windows(2).any(|pair| {
1409                        pair[0] == "-np"
1410                            && pair[1] == MANAGED_RERANKER_PARALLEL_REQUESTS.to_string()
1411                    }),
1412                    "expected managed reranker parallelism in args: {args:?}"
1413                );
1414                assert!(
1415                    args.windows(2)
1416                        .any(|pair| pair[0] == "-c" && pair[1] == context_tokens),
1417                    "expected managed reranker context in args: {args:?}"
1418                );
1419                assert!(
1420                    args.windows(2)
1421                        .any(|pair| pair[0] == "-ub" && pair[1] == ubatch_tokens),
1422                    "expected managed reranker ubatch in args: {args:?}"
1423                );
1424            }
1425        }
1426    }
1427
1428    #[test]
1429    fn started_service_note_omits_model_path() {
1430        let note = started_service_note("embedder", "http://127.0.0.1:8101");
1431        assert_eq!(note, "started embedder on http://127.0.0.1:8101");
1432        assert!(!note.contains(".gguf"));
1433    }
1434}