1use std::collections::{HashMap, HashSet};
2use std::fs::{self, File};
3use std::io;
4use std::net::TcpListener;
5use std::path::{Path, PathBuf};
6use std::process::{Command, Stdio};
7use std::sync::{Mutex, OnceLock};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use hf_hub::api::sync::ApiBuilder;
12use hf_hub::{Repo, RepoType};
13use kbolt_types::{KboltError, LocalAction, LocalReport, LocalServiceReport};
14
15use crate::config::{
16 self, Config, EmbedderRoleConfig, ExpanderRoleConfig, ProviderOperation, ProviderProfileConfig,
17 RerankerRoleConfig,
18};
19use crate::models::{self, build_inference_clients_without_managed_recovery, EmbeddingInputKind};
20use crate::Result;
21
22const APP_NAME: &str = "kbolt";
23const LOCALHOST: &str = "127.0.0.1";
24const LLAMA_SERVER_BREW_HINT: &str = "brew install llama.cpp";
25const LLAMA_SERVER_LOG_VERBOSITY: &str = "1";
26const MODEL_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15);
27const STOP_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
28pub(crate) const MANAGED_EMBEDDER_PARALLEL_REQUESTS: usize = 4;
29pub(crate) const MANAGED_RERANKER_PARALLEL_REQUESTS: usize = 4;
30const MANAGED_PARALLEL_CONTEXT_TOKENS: usize = 2048;
31
32const MANAGED_EMBED_PROVIDER: &str = "kbolt_local_embed";
33pub(crate) const MANAGED_RERANK_PROVIDER: &str = "kbolt_local_rerank";
34const MANAGED_EXPAND_PROVIDER: &str = "kbolt_local_expand";
35
36const EMBEDDER_MODEL_LABEL: &str = "embeddinggemma";
37const RERANKER_MODEL_LABEL: &str = "qwen3-reranker";
38const EXPANDER_MODEL_LABEL: &str = "qwen3-1.7b";
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41enum ManagedRole {
42 Embedder,
43 Reranker,
44 Expander,
45}
46
47#[derive(Debug, Clone, Copy)]
48struct ManagedServiceSpec {
49 role: ManagedRole,
50 name: &'static str,
51 provider_name: &'static str,
52 model_label: &'static str,
53 model_repo: &'static str,
54 model_file: &'static str,
55 preferred_port: u16,
56}
57
58const EMBEDDER_SPEC: ManagedServiceSpec = ManagedServiceSpec {
59 role: ManagedRole::Embedder,
60 name: "embedder",
61 provider_name: MANAGED_EMBED_PROVIDER,
62 model_label: EMBEDDER_MODEL_LABEL,
63 model_repo: "ggml-org/embeddinggemma-300M-GGUF",
64 model_file: "embeddinggemma-300M-Q8_0.gguf",
65 preferred_port: 8101,
66};
67
68const RERANKER_SPEC: ManagedServiceSpec = ManagedServiceSpec {
69 role: ManagedRole::Reranker,
70 name: "reranker",
71 provider_name: MANAGED_RERANK_PROVIDER,
72 model_label: RERANKER_MODEL_LABEL,
73 model_repo: "ggml-org/Qwen3-Reranker-0.6B-Q8_0-GGUF",
74 model_file: "qwen3-reranker-0.6b-q8_0.gguf",
75 preferred_port: 8102,
76};
77
78const EXPANDER_SPEC: ManagedServiceSpec = ManagedServiceSpec {
79 role: ManagedRole::Expander,
80 name: "expander",
81 provider_name: MANAGED_EXPAND_PROVIDER,
82 model_label: EXPANDER_MODEL_LABEL,
83 model_repo: "Qwen/Qwen3-1.7B-GGUF",
84 model_file: "Qwen3-1.7B-Q8_0.gguf",
85 preferred_port: 8103,
86};
87
88static EMBEDDER_RECOVERY_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
89static RERANKER_RECOVERY_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
90static EXPANDER_RECOVERY_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
91
92pub(crate) fn is_managed_provider_name(provider_name: &str) -> bool {
93 managed_service_spec(provider_name).is_some()
94}
95
96pub(crate) fn managed_provider_label(provider_name: &str) -> Option<&'static str> {
97 managed_service_spec(provider_name).map(|spec| spec.name)
98}
99
100pub(crate) fn managed_provider_model_path(
101 cache_dir: &Path,
102 provider_name: &str,
103) -> Option<PathBuf> {
104 managed_service_spec(provider_name).map(|spec| managed_model_path(cache_dir, spec))
105}
106
107pub(crate) fn restart_managed_service(config: &Config, provider_name: &str) -> Result<()> {
108 let spec = managed_service_spec(provider_name).ok_or_else(|| {
109 KboltError::Config(format!(
110 "automatic recovery is only supported for managed local providers; got '{provider_name}'"
111 ))
112 })?;
113 let _guard = managed_service_recovery_lock(spec).lock().map_err(|_| {
114 KboltError::Internal(format!("managed {} recovery lock was poisoned", spec.name))
115 })?;
116
117 prepare_runtime_dirs(&config.cache_dir)?;
118 let llama_server_path = find_llama_server()?;
119 let port = provider_port(config, spec.provider_name)?;
120 let service = start_or_reuse_service(config, &llama_server_path, spec, port)?;
121 if service.ready {
122 return Ok(());
123 }
124
125 Err(KboltError::Inference(format!(
126 "managed {} on {} is running but not ready; run `kbolt local status`",
127 spec.name, service.endpoint
128 ))
129 .into())
130}
131
132pub fn setup_local(config_path: Option<&Path>) -> Result<LocalReport> {
133 let llama_server_path = find_llama_server()?;
134 let (mut config, mut notes) = load_setup_config(config_path)?;
135 let mut reserved_ports = HashSet::new();
136 let mut started = Vec::new();
137
138 prepare_runtime_dirs(&config.cache_dir)?;
139 let embedder = match prepare_service(
140 &config,
141 &llama_server_path,
142 &EMBEDDER_SPEC,
143 &mut reserved_ports,
144 &mut notes,
145 &mut started,
146 ) {
147 Ok(service) => service,
148 Err(err) => {
149 stop_started_children(&started);
150 return Err(err);
151 }
152 };
153 let reranker = match prepare_service(
154 &config,
155 &llama_server_path,
156 &RERANKER_SPEC,
157 &mut reserved_ports,
158 &mut notes,
159 &mut started,
160 ) {
161 Ok(service) => service,
162 Err(err) => {
163 stop_started_children(&started);
164 return Err(err);
165 }
166 };
167
168 apply_managed_service_config(&mut config, &EMBEDDER_SPEC, embedder.port);
169 apply_managed_service_config(&mut config, &RERANKER_SPEC, reranker.port);
170 if config.default_space.is_none() {
171 config.default_space = Some("default".to_string());
172 }
173
174 if let Err(err) = config::save(&config) {
175 stop_started_children(&started);
176 return Err(err);
177 }
178
179 build_report(
180 &config,
181 Some(llama_server_path),
182 LocalAction::Setup,
183 notes,
184 &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
185 )
186}
187
188fn load_setup_config(config_path: Option<&Path>) -> Result<(Config, Vec<String>)> {
189 let config_file = config::resolve_config_file_path(config_path)?;
190 match config::load(config_path) {
191 Ok(config) => Ok((config, Vec::new())),
192 Err(err @ crate::error::CoreError::Domain(KboltError::Config(_)))
193 if config_file.exists() =>
194 {
195 if !looks_like_legacy_config(&config_file)? {
196 return Err(err);
197 }
198 let backup_path = backup_invalid_config(&config_file)?;
199 let config = config::load(config_path)?;
200 Ok((
201 config,
202 vec![format!(
203 "moved incompatible legacy config to {} and created a fresh current config",
204 backup_path.display()
205 )],
206 ))
207 }
208 Err(err) => Err(err),
209 }
210}
211
212fn looks_like_legacy_config(config_file: &Path) -> Result<bool> {
213 let raw = fs::read_to_string(config_file)?;
214 Ok(raw
215 .lines()
216 .map(str::trim)
217 .any(|line| matches!(line, "[embeddings]" | "[models]")))
218}
219
220fn backup_invalid_config(config_file: &Path) -> Result<PathBuf> {
221 let Some(file_name) = config_file.file_name().and_then(|name| name.to_str()) else {
222 return Err(KboltError::Internal(format!(
223 "invalid config path: {}",
224 config_file.display()
225 ))
226 .into());
227 };
228 let parent = config_file.parent().ok_or_else(|| {
229 KboltError::Internal(format!(
230 "config file has no parent: {}",
231 config_file.display()
232 ))
233 })?;
234
235 for suffix in std::iter::once(".invalid.bak".to_string())
236 .chain((1usize..).map(|index| format!(".invalid.{index}.bak")))
237 {
238 let candidate = parent.join(format!("{file_name}{suffix}"));
239 if candidate.exists() {
240 continue;
241 }
242
243 fs::rename(config_file, &candidate)?;
244 return Ok(candidate);
245 }
246
247 unreachable!("backup suffix iterator is infinite");
248}
249
250pub fn enable_deep(config_path: Option<&Path>) -> Result<LocalReport> {
251 let llama_server_path = find_llama_server()?;
252 let mut config = config::load(config_path)?;
253 ensure_managed_local_base_configured(&config)?;
254
255 let mut reserved_ports = reserved_ports_from_config(&config);
256 let mut notes = Vec::new();
257 let mut started = Vec::new();
258 let expander = match prepare_service(
259 &config,
260 &llama_server_path,
261 &EXPANDER_SPEC,
262 &mut reserved_ports,
263 &mut notes,
264 &mut started,
265 ) {
266 Ok(service) => service,
267 Err(err) => {
268 stop_started_children(&started);
269 return Err(err);
270 }
271 };
272 apply_managed_service_config(&mut config, &EXPANDER_SPEC, expander.port);
273
274 if let Err(err) = config::save(&config) {
275 stop_started_children(&started);
276 return Err(err);
277 }
278
279 build_report(
280 &config,
281 Some(llama_server_path),
282 LocalAction::EnableDeep,
283 notes,
284 &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
285 )
286}
287
288pub fn start_local(config_path: Option<&Path>) -> Result<LocalReport> {
289 let llama_server_path = find_llama_server()?;
290 let config = config::load_existing(config_path)?;
291 ensure_managed_local_base_configured(&config)?;
292 prepare_runtime_dirs(&config.cache_dir)?;
293
294 let mut notes = Vec::new();
295 let specs = configured_specs(&config);
296 for spec in &specs {
297 let port = provider_port(&config, spec.provider_name)?;
298 let service = start_or_reuse_service(&config, &llama_server_path, spec, port)?;
299 if let Some(pid) = service.started_pid {
300 let _ = pid;
301 notes.push(format!("started {} on {}", spec.name, service.endpoint));
302 } else if service.ready {
303 notes.push(format!(
304 "{} already ready on {}",
305 spec.name, service.endpoint
306 ));
307 } else if service.running {
308 notes.push(format!(
309 "{} is already running on {} but is not managed by kbolt",
310 spec.name, service.endpoint
311 ));
312 }
313 }
314
315 build_report(
316 &config,
317 Some(llama_server_path),
318 LocalAction::Start,
319 notes,
320 &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
321 )
322}
323
324pub fn stop_local(config_path: Option<&Path>) -> Result<LocalReport> {
325 let config = config::load_existing(config_path)?;
326 let mut notes = Vec::new();
327
328 for spec in configured_specs(&config) {
329 let pid_file = pid_file_path(&config.cache_dir, spec);
330 let Some(pid) = read_pid(&pid_file)? else {
331 notes.push(format!("{} is not managed by a kbolt pid file", spec.name));
332 continue;
333 };
334
335 if !pid_is_alive(pid) {
336 remove_pid_file(&pid_file)?;
337 notes.push(format!("removed stale pid file for {}", spec.name));
338 continue;
339 }
340
341 terminate_pid(pid)?;
342 remove_pid_file(&pid_file)?;
343 notes.push(format!("stopped {}", spec.name));
344 }
345
346 build_report(
347 &config,
348 find_llama_server_optional(),
349 LocalAction::Stop,
350 notes,
351 &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
352 )
353}
354
355pub fn local_status(config_path: Option<&Path>) -> Result<LocalReport> {
356 let config_file = config::resolve_config_file_path(config_path)?;
357 let cache_dir = default_cache_dir()?;
358 if !config_file.exists() {
359 return Ok(LocalReport {
360 action: LocalAction::Status,
361 config_file,
362 cache_dir: cache_dir.clone(),
363 llama_server_path: find_llama_server_optional(),
364 ready: false,
365 notes: vec!["kbolt is not set up yet; run `kbolt setup local`.".to_string()],
366 services: vec![
367 missing_service_report(&cache_dir, &EMBEDDER_SPEC),
368 missing_service_report(&cache_dir, &RERANKER_SPEC),
369 missing_service_report(&cache_dir, &EXPANDER_SPEC),
370 ],
371 });
372 }
373
374 let config = config::load_existing(config_path)?;
375 build_report(
376 &config,
377 find_llama_server_optional(),
378 LocalAction::Status,
379 Vec::new(),
380 &[EMBEDDER_SPEC, RERANKER_SPEC, EXPANDER_SPEC],
381 )
382}
383
384#[derive(Debug)]
385struct PreparedService {
386 port: u16,
387}
388
389#[derive(Debug)]
390struct RunningService {
391 endpoint: String,
392 running: bool,
393 ready: bool,
394 started_pid: Option<u32>,
395}
396
397fn prepare_service(
398 config: &Config,
399 llama_server_path: &Path,
400 spec: &ManagedServiceSpec,
401 reserved_ports: &mut HashSet<u16>,
402 notes: &mut Vec<String>,
403 started: &mut Vec<u32>,
404) -> Result<PreparedService> {
405 ensure_model_file(&config.cache_dir, spec)?;
406 let port = select_port(config, spec, reserved_ports)?;
407 reserved_ports.insert(port);
408 let service = start_or_reuse_service(config, llama_server_path, spec, port)?;
409 if let Some(pid) = service.started_pid {
410 started.push(pid);
411 notes.push(started_service_note(spec.name, &service.endpoint));
412 } else if port != spec.preferred_port {
413 notes.push(format!(
414 "{} default port {} was unavailable; using {}",
415 spec.name, spec.preferred_port, port
416 ));
417 }
418 Ok(PreparedService { port })
419}
420
421fn started_service_note(name: &str, endpoint: &str) -> String {
422 format!("started {name} on {endpoint}")
423}
424
425fn start_or_reuse_service(
426 config: &Config,
427 llama_server_path: &Path,
428 spec: &ManagedServiceSpec,
429 port: u16,
430) -> Result<RunningService> {
431 let endpoint = endpoint_for_port(port);
432 let pid_file = pid_file_path(&config.cache_dir, spec);
433 let model_path = ensure_model_file(&config.cache_dir, spec)?;
434
435 if let Some(pid) = read_pid(&pid_file)? {
436 if pid_is_alive(pid) {
437 if probe_service(config, spec, port).is_ok() {
438 return Ok(RunningService {
439 endpoint,
440 running: true,
441 ready: true,
442 started_pid: None,
443 });
444 }
445 } else {
446 remove_pid_file(&pid_file)?;
447 }
448 }
449
450 if is_port_bound(port) {
451 let ready = probe_service(config, spec, port).is_ok();
452 return Ok(RunningService {
453 endpoint,
454 running: true,
455 ready,
456 started_pid: None,
457 });
458 }
459
460 let log_file = log_file_path(&config.cache_dir, spec);
461 let child = spawn_llama_server(llama_server_path, spec, &model_path, port, &log_file)?;
462 write_pid(&pid_file, child.id())?;
463 let pid = child.id();
464 drop(child);
465
466 let start = Instant::now();
467 loop {
468 if probe_service(config, spec, port).is_ok() {
469 return Ok(RunningService {
470 endpoint,
471 running: true,
472 ready: true,
473 started_pid: Some(pid),
474 });
475 }
476
477 if start.elapsed() >= MODEL_DOWNLOAD_TIMEOUT {
478 let _ = terminate_pid(pid);
479 let _ = remove_pid_file(&pid_file);
480 return Err(KboltError::Inference(format!(
481 "{} did not become ready on {} within {}s; check {}",
482 spec.name,
483 endpoint,
484 MODEL_DOWNLOAD_TIMEOUT.as_secs(),
485 log_file.display()
486 ))
487 .into());
488 }
489
490 thread::sleep(Duration::from_millis(250));
491 }
492}
493
494fn build_report(
495 config: &Config,
496 llama_server_path: Option<PathBuf>,
497 action: LocalAction,
498 notes: Vec<String>,
499 specs: &[ManagedServiceSpec],
500) -> Result<LocalReport> {
501 let mut services = Vec::new();
502 let mut ready = true;
503
504 for spec in specs {
505 let provider = config.providers.get(spec.provider_name);
506 let configured = role_uses_provider(config, spec.provider_name);
507 let enabled = provider.is_some();
508 let model_path = managed_model_path(&config.cache_dir, spec);
509 let port = provider
510 .map(|profile| provider_profile_port(profile))
511 .transpose()?
512 .unwrap_or(spec.preferred_port);
513 let endpoint = endpoint_for_port(port);
514 let pid_file = pid_file_path(&config.cache_dir, spec);
515 let log_file = log_file_path(&config.cache_dir, spec);
516 let pid = read_pid(&pid_file)?;
517 let managed = pid.map(pid_is_alive).unwrap_or(false);
518 let running = is_port_bound(port);
519 let probe = if enabled {
520 probe_service(config, spec, port).err()
521 } else {
522 None
523 };
524 let service_ready = enabled && probe.is_none();
525 if enabled && !service_ready {
526 ready = false;
527 }
528
529 services.push(LocalServiceReport {
530 name: spec.name.to_string(),
531 provider: spec.provider_name.to_string(),
532 enabled,
533 configured,
534 managed,
535 running,
536 ready: service_ready,
537 model: spec.model_label.to_string(),
538 model_path,
539 endpoint,
540 port,
541 pid: pid.filter(|value| pid_is_alive(*value)),
542 pid_file,
543 log_file,
544 issue: if !enabled {
545 Some("service is not configured".to_string())
546 } else if service_ready {
547 if running && !managed {
548 Some("service is reachable but not managed by kbolt".to_string())
549 } else {
550 None
551 }
552 } else {
553 Some(
554 probe
555 .map(|err| err.to_string())
556 .unwrap_or_else(|| "service is not ready".to_string()),
557 )
558 },
559 });
560 }
561
562 Ok(LocalReport {
563 action,
564 config_file: config.config_dir.join("index.toml"),
565 cache_dir: config.cache_dir.clone(),
566 llama_server_path,
567 ready,
568 notes,
569 services,
570 })
571}
572
573fn missing_service_report(cache_dir: &Path, spec: &ManagedServiceSpec) -> LocalServiceReport {
574 LocalServiceReport {
575 name: spec.name.to_string(),
576 provider: spec.provider_name.to_string(),
577 enabled: false,
578 configured: false,
579 managed: false,
580 running: false,
581 ready: false,
582 model: spec.model_label.to_string(),
583 model_path: managed_model_path(cache_dir, spec),
584 endpoint: endpoint_for_port(spec.preferred_port),
585 port: spec.preferred_port,
586 pid: None,
587 pid_file: pid_file_path(cache_dir, spec),
588 log_file: log_file_path(cache_dir, spec),
589 issue: Some("service is not configured".to_string()),
590 }
591}
592
593fn prepare_runtime_dirs(cache_dir: &Path) -> Result<()> {
594 fs::create_dir_all(cache_dir.join("models"))?;
595 fs::create_dir_all(cache_dir.join("run"))?;
596 fs::create_dir_all(cache_dir.join("logs"))?;
597 Ok(())
598}
599
600fn ensure_managed_local_base_configured(config: &Config) -> Result<()> {
601 if !config.providers.contains_key(MANAGED_EMBED_PROVIDER)
602 || !config.providers.contains_key(MANAGED_RERANK_PROVIDER)
603 {
604 return Err(KboltError::Config(
605 "managed local setup is not configured; run `kbolt setup local` first".to_string(),
606 )
607 .into());
608 }
609 Ok(())
610}
611
612fn managed_service_spec(provider_name: &str) -> Option<&'static ManagedServiceSpec> {
613 match provider_name {
614 MANAGED_EMBED_PROVIDER => Some(&EMBEDDER_SPEC),
615 MANAGED_RERANK_PROVIDER => Some(&RERANKER_SPEC),
616 MANAGED_EXPAND_PROVIDER => Some(&EXPANDER_SPEC),
617 _ => None,
618 }
619}
620
621fn managed_service_recovery_lock(spec: &ManagedServiceSpec) -> &'static Mutex<()> {
622 match spec.role {
623 ManagedRole::Embedder => EMBEDDER_RECOVERY_LOCK.get_or_init(|| Mutex::new(())),
624 ManagedRole::Reranker => RERANKER_RECOVERY_LOCK.get_or_init(|| Mutex::new(())),
625 ManagedRole::Expander => EXPANDER_RECOVERY_LOCK.get_or_init(|| Mutex::new(())),
626 }
627}
628
629fn configured_specs(config: &Config) -> Vec<&'static ManagedServiceSpec> {
630 let mut specs = Vec::new();
631 if config.providers.contains_key(MANAGED_EMBED_PROVIDER) {
632 specs.push(&EMBEDDER_SPEC);
633 }
634 if config.providers.contains_key(MANAGED_RERANK_PROVIDER) {
635 specs.push(&RERANKER_SPEC);
636 }
637 if config.providers.contains_key(MANAGED_EXPAND_PROVIDER) {
638 specs.push(&EXPANDER_SPEC);
639 }
640 specs
641}
642
643fn role_uses_provider(config: &Config, provider_name: &str) -> bool {
644 config
645 .roles
646 .embedder
647 .as_ref()
648 .map(|role| role.provider == provider_name)
649 .unwrap_or(false)
650 || config
651 .roles
652 .reranker
653 .as_ref()
654 .map(|role| role.provider == provider_name)
655 .unwrap_or(false)
656 || config
657 .roles
658 .expander
659 .as_ref()
660 .map(|role| role.provider == provider_name)
661 .unwrap_or(false)
662}
663
664fn apply_managed_service_config(config: &mut Config, spec: &ManagedServiceSpec, port: u16) {
665 config.providers.insert(
666 spec.provider_name.to_string(),
667 ProviderProfileConfig::LlamaCppServer {
668 operation: match spec.role {
669 ManagedRole::Embedder => ProviderOperation::Embedding,
670 ManagedRole::Reranker => ProviderOperation::Reranking,
671 ManagedRole::Expander => ProviderOperation::ChatCompletion,
672 },
673 base_url: endpoint_for_port(port),
674 model: spec.model_label.to_string(),
675 parallel_requests: match spec.role {
676 ManagedRole::Embedder => Some(MANAGED_EMBEDDER_PARALLEL_REQUESTS),
677 ManagedRole::Reranker => Some(MANAGED_RERANKER_PARALLEL_REQUESTS),
678 ManagedRole::Expander => None,
679 },
680 timeout_ms: 30_000,
681 max_retries: 2,
682 },
683 );
684
685 match spec.role {
686 ManagedRole::Embedder => {
687 config.roles.embedder = Some(EmbedderRoleConfig {
688 provider: spec.provider_name.to_string(),
689 batch_size: 32,
690 });
691 }
692 ManagedRole::Reranker => {
693 config.roles.reranker = Some(RerankerRoleConfig {
694 provider: spec.provider_name.to_string(),
695 });
696 }
697 ManagedRole::Expander => {
698 config.roles.expander = Some(ExpanderRoleConfig {
699 provider: spec.provider_name.to_string(),
700 max_tokens: 600,
701 sampling: crate::config::ExpanderSamplingConfig::default(),
702 });
703 }
704 }
705}
706
707fn ensure_model_file(cache_dir: &Path, spec: &ManagedServiceSpec) -> Result<PathBuf> {
708 let model_path = managed_model_path(cache_dir, spec);
709 if model_path.is_file() {
710 return Ok(model_path);
711 }
712
713 let download_dir = cache_dir.join("models").join(spec.name);
714 fs::create_dir_all(&download_dir)?;
715
716 let api = ApiBuilder::new()
717 .with_cache_dir(download_dir.clone())
718 .build()
719 .map_err(|err| KboltError::ModelDownload(format!("{}: {err}", spec.model_repo)))?;
720 let repo = api.repo(Repo::new(spec.model_repo.to_string(), RepoType::Model));
721 let downloaded_path = repo
722 .get(spec.model_file)
723 .map_err(|err| KboltError::ModelDownload(format!("{}: {err}", spec.model_repo)))?;
724
725 if downloaded_path != model_path {
726 fs::copy(&downloaded_path, &model_path)?;
727 }
728
729 Ok(model_path)
730}
731
732fn managed_model_path(cache_dir: &Path, spec: &ManagedServiceSpec) -> PathBuf {
733 cache_dir
734 .join("models")
735 .join(spec.name)
736 .join(spec.model_file)
737}
738
739fn pid_file_path(cache_dir: &Path, spec: &ManagedServiceSpec) -> PathBuf {
740 cache_dir.join("run").join(format!("{}.pid", spec.name))
741}
742
743fn log_file_path(cache_dir: &Path, spec: &ManagedServiceSpec) -> PathBuf {
744 cache_dir.join("logs").join(format!("{}.log", spec.name))
745}
746
747fn endpoint_for_port(port: u16) -> String {
748 format!("http://{LOCALHOST}:{port}")
749}
750
751fn default_cache_dir() -> Result<PathBuf> {
752 let base = dirs::cache_dir()
753 .ok_or_else(|| KboltError::Config("unable to determine user cache directory".into()))?;
754 Ok(base.join(APP_NAME))
755}
756
757fn select_port(
758 config: &Config,
759 spec: &ManagedServiceSpec,
760 reserved_ports: &HashSet<u16>,
761) -> Result<u16> {
762 let preferred = config
763 .providers
764 .get(spec.provider_name)
765 .map(provider_profile_port)
766 .transpose()?
767 .unwrap_or(spec.preferred_port);
768
769 if port_candidate_usable(config, spec, preferred, reserved_ports)? {
770 return Ok(preferred);
771 }
772
773 for port in preferred..(preferred + 20) {
774 if port_candidate_usable(config, spec, port, reserved_ports)? {
775 return Ok(port);
776 }
777 }
778
779 Err(KboltError::Config(format!(
780 "no free local port found for {} near {}",
781 spec.name, preferred
782 ))
783 .into())
784}
785
786fn port_candidate_usable(
787 config: &Config,
788 spec: &ManagedServiceSpec,
789 port: u16,
790 reserved_ports: &HashSet<u16>,
791) -> Result<bool> {
792 if reserved_ports.contains(&port) {
793 return Ok(false);
794 }
795
796 let pid_file = pid_file_path(&config.cache_dir, spec);
797 if let Some(pid) = read_pid(&pid_file)? {
798 if pid_is_alive(pid) && provider_port_matches(config, spec.provider_name, port)? {
799 return Ok(true);
800 }
801 }
802
803 Ok(!is_port_bound(port))
804}
805
806fn provider_port_matches(config: &Config, provider_name: &str, port: u16) -> Result<bool> {
807 let Some(profile) = config.providers.get(provider_name) else {
808 return Ok(false);
809 };
810 Ok(provider_profile_port(profile)? == port)
811}
812
813fn reserved_ports_from_config(config: &Config) -> HashSet<u16> {
814 let mut ports = HashSet::new();
815 for provider_name in [
816 MANAGED_EMBED_PROVIDER,
817 MANAGED_RERANK_PROVIDER,
818 MANAGED_EXPAND_PROVIDER,
819 ] {
820 if let Some(profile) = config.providers.get(provider_name) {
821 if let Ok(port) = provider_profile_port(profile) {
822 ports.insert(port);
823 }
824 }
825 }
826 ports
827}
828
829fn provider_port(config: &Config, provider_name: &str) -> Result<u16> {
830 let profile = config.providers.get(provider_name).ok_or_else(|| {
831 KboltError::Config(format!("missing managed provider config: {provider_name}"))
832 })?;
833 provider_profile_port(profile)
834}
835
836fn provider_profile_port(profile: &ProviderProfileConfig) -> Result<u16> {
837 let base_url = profile.base_url();
838 let port = base_url
839 .rsplit(':')
840 .next()
841 .and_then(|raw| raw.parse::<u16>().ok())
842 .ok_or_else(|| KboltError::Config(format!("invalid managed local base_url: {base_url}")))?;
843 Ok(port)
844}
845
846fn open_managed_service_log(log_file: &Path) -> Result<File> {
847 if let Some(parent) = log_file.parent() {
848 fs::create_dir_all(parent)?;
849 }
850 File::options()
851 .create(true)
852 .write(true)
853 .truncate(true)
854 .open(log_file)
855 .map_err(Into::into)
856}
857
858fn configure_llama_server_command(
859 command: &mut Command,
860 llama_server_path: &Path,
861 spec: &ManagedServiceSpec,
862 model_path: &Path,
863 port: u16,
864) {
865 command
866 .arg(llama_server_path)
867 .arg("-m")
868 .arg(model_path)
869 .arg("--port")
870 .arg(port.to_string())
871 .arg("--log-verbosity")
872 .arg(LLAMA_SERVER_LOG_VERBOSITY);
873
874 match spec.role {
875 ManagedRole::Embedder => {
876 command
877 .arg("--embedding")
878 .arg("--pooling")
879 .arg("mean")
880 .arg("-ngl")
881 .arg("99")
882 .arg("-np")
883 .arg(MANAGED_EMBEDDER_PARALLEL_REQUESTS.to_string())
884 .arg("-c")
885 .arg(
886 (MANAGED_PARALLEL_CONTEXT_TOKENS * MANAGED_EMBEDDER_PARALLEL_REQUESTS)
887 .to_string(),
888 )
889 .arg("-ub")
890 .arg(MANAGED_PARALLEL_CONTEXT_TOKENS.to_string());
891 }
892 ManagedRole::Reranker => {
893 command
894 .arg("--reranking")
895 .arg("--pooling")
896 .arg("rank")
897 .arg("-ngl")
898 .arg("99")
899 .arg("-np")
900 .arg(MANAGED_RERANKER_PARALLEL_REQUESTS.to_string())
901 .arg("-c")
902 .arg(
903 (MANAGED_PARALLEL_CONTEXT_TOKENS * MANAGED_RERANKER_PARALLEL_REQUESTS)
904 .to_string(),
905 )
906 .arg("-ub")
907 .arg(MANAGED_PARALLEL_CONTEXT_TOKENS.to_string());
908 }
909 ManagedRole::Expander => {
910 command.arg("-ngl").arg("99").arg("-c").arg("2048");
911 }
912 }
913}
914
915fn spawn_llama_server(
916 llama_server_path: &Path,
917 spec: &ManagedServiceSpec,
918 model_path: &Path,
919 port: u16,
920 log_file: &Path,
921) -> Result<std::process::Child> {
922 let stdout = open_managed_service_log(log_file)?;
923 let stderr = stdout.try_clone()?;
924
925 let mut command = if Path::new("/usr/bin/nohup").is_file() {
926 Command::new("/usr/bin/nohup")
927 } else {
928 Command::new("nohup")
929 };
930 configure_llama_server_command(&mut command, llama_server_path, spec, model_path, port);
931
932 command
933 .stdin(Stdio::null())
934 .stdout(Stdio::from(stdout))
935 .stderr(Stdio::from(stderr))
936 .spawn()
937 .map_err(|err| {
938 KboltError::Inference(format!(
939 "failed to start {} via {}: {err}",
940 spec.name,
941 llama_server_path.display()
942 ))
943 })
944 .map_err(Into::into)
945}
946
947fn probe_service(config: &Config, spec: &ManagedServiceSpec, port: u16) -> Result<()> {
948 let mut probe_config = config.clone();
949 probe_config.providers = HashMap::new();
950 probe_config.roles.embedder = None;
951 probe_config.roles.reranker = None;
952 probe_config.roles.expander = None;
953 apply_managed_service_config(&mut probe_config, spec, port);
954
955 let status = models::status(&probe_config)?;
956 let info = match spec.role {
957 ManagedRole::Embedder => &status.embedder,
958 ManagedRole::Reranker => &status.reranker,
959 ManagedRole::Expander => &status.expander,
960 };
961 if !info.ready {
962 return Err(KboltError::Inference(
963 info.issue
964 .clone()
965 .unwrap_or_else(|| format!("{} is not ready", spec.name)),
966 )
967 .into());
968 }
969
970 let clients = build_inference_clients_without_managed_recovery(&probe_config)?;
971 match spec.role {
972 ManagedRole::Embedder => {
973 let embedder = clients.embedder.as_ref().ok_or_else(|| {
974 KboltError::Inference("managed embedder client was not built".to_string())
975 })?;
976 let vectors = embedder.embed_batch(
977 EmbeddingInputKind::Query,
978 &["kbolt local probe".to_string()],
979 )?;
980 if vectors.len() != 1 || vectors[0].is_empty() {
981 return Err(KboltError::Inference(
982 "managed embedder smoke returned an invalid embedding".to_string(),
983 )
984 .into());
985 }
986 let sizer = clients.embedding_document_sizer.as_ref().ok_or_else(|| {
987 KboltError::Inference("managed embedder tokenizer client was not built".to_string())
988 })?;
989 let tokens = sizer.count_document_tokens("kbolt local probe")?;
990 if tokens == 0 {
991 return Err(KboltError::Inference(
992 "managed embedder tokenize smoke returned zero tokens".to_string(),
993 )
994 .into());
995 }
996 }
997 ManagedRole::Reranker => {
998 let reranker = clients.reranker.as_ref().ok_or_else(|| {
999 KboltError::Inference("managed reranker client was not built".to_string())
1000 })?;
1001 let scores = reranker.rerank(
1002 "kbolt local probe",
1003 &["kbolt local rerank document".to_string()],
1004 )?;
1005 if scores.len() != 1 || !scores[0].is_finite() {
1006 return Err(KboltError::Inference(
1007 "managed reranker smoke returned an invalid score".to_string(),
1008 )
1009 .into());
1010 }
1011 }
1012 ManagedRole::Expander => {
1013 let expander = clients.expander.as_ref().ok_or_else(|| {
1014 KboltError::Inference("managed expander client was not built".to_string())
1015 })?;
1016 let variants = expander.expand("kbolt local probe", 2)?;
1017 if variants.is_empty() {
1018 return Err(KboltError::Inference(
1019 "managed expander smoke returned no variants".to_string(),
1020 )
1021 .into());
1022 }
1023 }
1024 }
1025
1026 Ok(())
1027}
1028
1029fn find_llama_server() -> Result<PathBuf> {
1030 find_llama_server_optional().ok_or_else(|| {
1031 KboltError::Config(format!(
1032 "llama-server was not found on PATH. Install llama.cpp and rerun setup. macOS hint: `{LLAMA_SERVER_BREW_HINT}`"
1033 ))
1034 .into()
1035 })
1036}
1037
1038fn find_llama_server_optional() -> Option<PathBuf> {
1039 let mut candidates = Vec::new();
1040 if let Some(path_var) = std::env::var_os("PATH") {
1041 for dir in std::env::split_paths(&path_var) {
1042 candidates.push(dir.join("llama-server"));
1043 }
1044 }
1045 candidates.push(PathBuf::from("/opt/homebrew/bin/llama-server"));
1046 candidates.push(PathBuf::from("/usr/local/bin/llama-server"));
1047
1048 candidates.into_iter().find(|path| path.is_file())
1049}
1050
1051fn write_pid(path: &Path, pid: u32) -> Result<()> {
1052 if let Some(parent) = path.parent() {
1053 fs::create_dir_all(parent)?;
1054 }
1055 fs::write(path, format!("{pid}\n"))?;
1056 Ok(())
1057}
1058
1059fn read_pid(path: &Path) -> Result<Option<u32>> {
1060 if !path.exists() {
1061 return Ok(None);
1062 }
1063 let raw = fs::read_to_string(path)?;
1064 let pid = raw.trim().parse::<u32>().map_err(|err| {
1065 KboltError::Internal(format!("invalid pid file {}: {err}", path.display()))
1066 })?;
1067 Ok(Some(pid))
1068}
1069
1070fn remove_pid_file(path: &Path) -> Result<()> {
1071 match fs::remove_file(path) {
1072 Ok(()) => Ok(()),
1073 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
1074 Err(err) => Err(err.into()),
1075 }
1076}
1077
1078fn pid_is_alive(pid: u32) -> bool {
1079 Command::new("kill")
1080 .arg("-0")
1081 .arg(pid.to_string())
1082 .stdin(Stdio::null())
1083 .stdout(Stdio::null())
1084 .stderr(Stdio::null())
1085 .status()
1086 .map(|status| status.success())
1087 .unwrap_or(false)
1088}
1089
1090fn terminate_pid(pid: u32) -> Result<()> {
1091 let term = Command::new("kill")
1092 .arg(pid.to_string())
1093 .stdin(Stdio::null())
1094 .stdout(Stdio::null())
1095 .stderr(Stdio::null())
1096 .status()
1097 .map_err(|err| KboltError::Internal(format!("failed to send SIGTERM to {pid}: {err}")))?;
1098 if !term.success() && pid_is_alive(pid) {
1099 return Err(KboltError::Internal(format!("failed to stop process {pid}")).into());
1100 }
1101
1102 let start = Instant::now();
1103 while pid_is_alive(pid) && start.elapsed() < STOP_WAIT_TIMEOUT {
1104 thread::sleep(Duration::from_millis(100));
1105 }
1106
1107 if pid_is_alive(pid) {
1108 let kill = Command::new("kill")
1109 .arg("-9")
1110 .arg(pid.to_string())
1111 .stdin(Stdio::null())
1112 .stdout(Stdio::null())
1113 .stderr(Stdio::null())
1114 .status()
1115 .map_err(|err| {
1116 KboltError::Internal(format!("failed to send SIGKILL to {pid}: {err}"))
1117 })?;
1118 if !kill.success() && pid_is_alive(pid) {
1119 return Err(KboltError::Internal(format!("failed to kill process {pid}")).into());
1120 }
1121 }
1122
1123 Ok(())
1124}
1125
1126fn is_port_bound(port: u16) -> bool {
1127 TcpListener::bind((LOCALHOST, port)).is_err()
1128}
1129
1130fn stop_started_children(started: &[u32]) {
1131 for pid in started {
1132 let _ = terminate_pid(*pid);
1133 }
1134}
1135
1136#[cfg(test)]
1137mod tests {
1138 use std::fs;
1139 use std::io::Write;
1140 use std::net::TcpListener;
1141 use std::path::Path;
1142 use std::process::Command;
1143
1144 use tempfile::tempdir;
1145
1146 use super::{
1147 apply_managed_service_config, configure_llama_server_command, endpoint_for_port,
1148 load_setup_config, managed_model_path, managed_provider_model_path, missing_service_report,
1149 open_managed_service_log, select_port, started_service_note, ManagedRole, EMBEDDER_SPEC,
1150 EXPANDER_SPEC, LLAMA_SERVER_LOG_VERBOSITY, MANAGED_EMBEDDER_PARALLEL_REQUESTS,
1151 MANAGED_PARALLEL_CONTEXT_TOKENS, MANAGED_RERANKER_PARALLEL_REQUESTS, RERANKER_SPEC,
1152 };
1153 use crate::config::{self, Config, ProviderProfileConfig};
1154
1155 fn temp_config() -> Config {
1156 let tmp = tempdir().expect("tempdir");
1157 let root = tmp.keep();
1158 let config_dir = root.join("config");
1159 fs::create_dir_all(&config_dir).expect("config dir");
1160 let mut config = config::load(Some(&config_dir)).expect("load config");
1161 config.cache_dir = root.join("cache");
1162 fs::create_dir_all(&config.cache_dir).expect("cache dir");
1163 config
1164 }
1165
1166 #[test]
1167 fn setup_config_binds_managed_local_roles() {
1168 let mut config = temp_config();
1169 apply_managed_service_config(&mut config, &EMBEDDER_SPEC, 8101);
1170 apply_managed_service_config(&mut config, &RERANKER_SPEC, 8102);
1171
1172 assert_eq!(
1173 config
1174 .roles
1175 .embedder
1176 .as_ref()
1177 .map(|role| role.provider.as_str()),
1178 Some("kbolt_local_embed")
1179 );
1180 assert_eq!(
1181 config
1182 .roles
1183 .reranker
1184 .as_ref()
1185 .map(|role| role.provider.as_str()),
1186 Some("kbolt_local_rerank")
1187 );
1188 assert!(config.providers.contains_key("kbolt_local_embed"));
1189 assert!(config.providers.contains_key("kbolt_local_rerank"));
1190 assert_eq!(
1191 config
1192 .providers
1193 .get("kbolt_local_embed")
1194 .and_then(ProviderProfileConfig::parallel_requests),
1195 Some(MANAGED_EMBEDDER_PARALLEL_REQUESTS)
1196 );
1197 assert_eq!(
1198 config
1199 .providers
1200 .get("kbolt_local_rerank")
1201 .and_then(ProviderProfileConfig::parallel_requests),
1202 Some(MANAGED_RERANKER_PARALLEL_REQUESTS)
1203 );
1204 }
1205
1206 #[test]
1207 fn load_setup_config_moves_incompatible_index_toml_aside() {
1208 let tmp = tempdir().expect("tempdir");
1209 let config_dir = tmp.path().join("config");
1210 fs::create_dir_all(&config_dir).expect("config dir");
1211 let config_file = config_dir.join("index.toml");
1212 fs::write(
1213 &config_file,
1214 r#"
1215[embeddings]
1216provider = "legacy"
1217"#,
1218 )
1219 .expect("write invalid config");
1220
1221 let (config, notes) =
1222 load_setup_config(Some(&config_dir)).expect("setup config should recover");
1223
1224 let backup_path = config_dir.join("index.toml.invalid.bak");
1225 assert!(backup_path.is_file(), "backup should exist");
1226 assert!(
1227 notes
1228 .iter()
1229 .any(|note| note.contains("moved incompatible legacy config")),
1230 "expected recovery note, got {notes:?}"
1231 );
1232 assert!(
1233 config.providers.is_empty(),
1234 "fresh config should be created"
1235 );
1236 assert!(config.roles.embedder.is_none());
1237 assert!(config.roles.reranker.is_none());
1238 }
1239
1240 #[test]
1241 fn load_setup_config_keeps_non_legacy_invalid_config_as_error() {
1242 let tmp = tempdir().expect("tempdir");
1243 let config_dir = tmp.path().join("config");
1244 fs::create_dir_all(&config_dir).expect("config dir");
1245 let config_file = config_dir.join("index.toml");
1246 fs::write(
1247 &config_file,
1248 r#"
1249[providers.bad]
1250operation = "embedding"
1251"#,
1252 )
1253 .expect("write invalid config");
1254
1255 let err = load_setup_config(Some(&config_dir)).expect_err("invalid config should fail");
1256 assert!(
1257 err.to_string().contains("invalid config file"),
1258 "unexpected error: {err}"
1259 );
1260 assert!(config_file.is_file(), "invalid file should remain in place");
1261 assert!(
1262 !config_dir.join("index.toml.invalid.bak").exists(),
1263 "non-legacy config should not be backed up automatically"
1264 );
1265 }
1266
1267 #[test]
1268 fn enable_deep_binds_managed_expander_role() {
1269 let mut config = temp_config();
1270 apply_managed_service_config(&mut config, &EXPANDER_SPEC, 8103);
1271
1272 assert_eq!(
1273 config
1274 .roles
1275 .expander
1276 .as_ref()
1277 .map(|role| role.provider.as_str()),
1278 Some("kbolt_local_expand")
1279 );
1280 assert!(config.providers.contains_key("kbolt_local_expand"));
1281 }
1282
1283 #[test]
1284 fn select_port_skips_bound_ports() {
1285 let mut config = temp_config();
1286 let listener = TcpListener::bind(("127.0.0.1", 0)).expect("bind test port");
1287 let occupied_port = listener.local_addr().expect("local addr").port();
1288 apply_managed_service_config(&mut config, &EMBEDDER_SPEC, occupied_port);
1289
1290 let port = select_port(&config, &EMBEDDER_SPEC, &Default::default()).expect("select port");
1291 assert_ne!(port, occupied_port);
1292 }
1293
1294 #[test]
1295 fn missing_service_report_marks_unconfigured() {
1296 let cache = tempdir().expect("tempdir");
1297 let report = missing_service_report(cache.path(), &EMBEDDER_SPEC);
1298 assert!(!report.enabled);
1299 assert!(!report.ready);
1300 assert_eq!(report.endpoint, endpoint_for_port(8101));
1301 }
1302
1303 #[test]
1304 fn managed_model_paths_are_stable() {
1305 let cache = tempdir().expect("tempdir");
1306 assert_eq!(
1307 managed_model_path(cache.path(), &EMBEDDER_SPEC),
1308 cache
1309 .path()
1310 .join("models")
1311 .join("embedder")
1312 .join("embeddinggemma-300M-Q8_0.gguf")
1313 );
1314 assert_eq!(
1315 managed_model_path(cache.path(), &RERANKER_SPEC),
1316 cache
1317 .path()
1318 .join("models")
1319 .join("reranker")
1320 .join("qwen3-reranker-0.6b-q8_0.gguf")
1321 );
1322 assert_eq!(
1323 managed_model_path(cache.path(), &EXPANDER_SPEC),
1324 cache
1325 .path()
1326 .join("models")
1327 .join("expander")
1328 .join("Qwen3-1.7B-Q8_0.gguf")
1329 );
1330 assert_eq!(
1331 managed_provider_model_path(cache.path(), EMBEDDER_SPEC.provider_name),
1332 Some(managed_model_path(cache.path(), &EMBEDDER_SPEC))
1333 );
1334 assert_eq!(
1335 managed_provider_model_path(cache.path(), "custom_embedder"),
1336 None
1337 );
1338 }
1339
1340 #[test]
1341 fn managed_service_log_is_truncated_on_spawn_open() {
1342 let tmp = tempdir().expect("tempdir");
1343 let log_file = tmp.path().join("logs").join("embedder.log");
1344 fs::create_dir_all(log_file.parent().expect("log parent")).expect("create log dir");
1345 fs::write(&log_file, "old output\n").expect("write old log");
1346
1347 {
1348 let mut log = open_managed_service_log(&log_file).expect("open managed log");
1349 log.write_all(b"new output\n").expect("write new log");
1350 }
1351
1352 let content = fs::read_to_string(&log_file).expect("read log");
1353 assert_eq!(content, "new output\n");
1354 }
1355
1356 #[test]
1357 fn llama_server_command_limits_managed_log_verbosity() {
1358 for spec in [&EMBEDDER_SPEC, &RERANKER_SPEC, &EXPANDER_SPEC] {
1359 let mut command = Command::new("nohup");
1360 configure_llama_server_command(
1361 &mut command,
1362 Path::new("/usr/local/bin/llama-server"),
1363 spec,
1364 Path::new("/tmp/model.gguf"),
1365 spec.preferred_port,
1366 );
1367 let args = command
1368 .get_args()
1369 .map(|arg| arg.to_string_lossy().into_owned())
1370 .collect::<Vec<_>>();
1371
1372 assert!(
1373 args.windows(2).any(|pair| {
1374 pair[0] == "--log-verbosity" && pair[1] == LLAMA_SERVER_LOG_VERBOSITY
1375 }),
1376 "expected log verbosity in args for {}: {args:?}",
1377 spec.name
1378 );
1379 if spec.role == ManagedRole::Embedder {
1380 let context_tokens = (MANAGED_PARALLEL_CONTEXT_TOKENS
1381 * MANAGED_EMBEDDER_PARALLEL_REQUESTS)
1382 .to_string();
1383 let ubatch_tokens = MANAGED_PARALLEL_CONTEXT_TOKENS.to_string();
1384 assert!(
1385 args.windows(2).any(|pair| {
1386 pair[0] == "-np"
1387 && pair[1] == MANAGED_EMBEDDER_PARALLEL_REQUESTS.to_string()
1388 }),
1389 "expected managed embedder parallelism in args: {args:?}"
1390 );
1391 assert!(
1392 args.windows(2)
1393 .any(|pair| pair[0] == "-c" && pair[1] == context_tokens),
1394 "expected managed embedder context in args: {args:?}"
1395 );
1396 assert!(
1397 args.windows(2)
1398 .any(|pair| pair[0] == "-ub" && pair[1] == ubatch_tokens),
1399 "expected managed embedder ubatch in args: {args:?}"
1400 );
1401 }
1402 if spec.role == ManagedRole::Reranker {
1403 let context_tokens = (MANAGED_PARALLEL_CONTEXT_TOKENS
1404 * MANAGED_RERANKER_PARALLEL_REQUESTS)
1405 .to_string();
1406 let ubatch_tokens = MANAGED_PARALLEL_CONTEXT_TOKENS.to_string();
1407 assert!(
1408 args.windows(2).any(|pair| {
1409 pair[0] == "-np"
1410 && pair[1] == MANAGED_RERANKER_PARALLEL_REQUESTS.to_string()
1411 }),
1412 "expected managed reranker parallelism in args: {args:?}"
1413 );
1414 assert!(
1415 args.windows(2)
1416 .any(|pair| pair[0] == "-c" && pair[1] == context_tokens),
1417 "expected managed reranker context in args: {args:?}"
1418 );
1419 assert!(
1420 args.windows(2)
1421 .any(|pair| pair[0] == "-ub" && pair[1] == ubatch_tokens),
1422 "expected managed reranker ubatch in args: {args:?}"
1423 );
1424 }
1425 }
1426 }
1427
1428 #[test]
1429 fn started_service_note_omits_model_path() {
1430 let note = started_service_note("embedder", "http://127.0.0.1:8101");
1431 assert_eq!(note, "started embedder on http://127.0.0.1:8101");
1432 assert!(!note.contains(".gguf"));
1433 }
1434}