sqlite_graphrag/commands/
slots.rs1use clap::{Args, Subcommand};
10use serde::Serialize;
11
12use crate::errors::AppError;
13use crate::llm_slots::{slot_path, slots_dir};
14use crate::output::emit_json_compact;
15use crate::output::OutputFormat;
16
17#[derive(Debug, Args)]
20pub struct SlotsArgs {
21 #[command(subcommand)]
22 pub cmd: SlotsCmd,
23}
24
25#[derive(Debug, Subcommand)]
26pub enum SlotsCmd {
27 Status(SlotsStatusArgs),
29 Release {
31 #[arg(long)]
33 slot_id: u32,
34 #[arg(long)]
36 yes: bool,
37 #[arg(long, hide = true)]
39 json: bool,
40 },
41 Cleanup {
43 #[arg(long, default_value_t = 3600)]
45 stale_after: u64,
46 #[arg(long)]
48 yes: bool,
49 #[arg(long)]
51 dry_run: bool,
52 },
53}
54
55#[derive(Debug, clap::Args)]
56pub struct SlotsStatusArgs {
57 #[arg(long, value_enum, default_value_t = OutputFormat::Json)]
59 pub format: OutputFormat,
60 #[arg(long, hide = true)]
62 pub json: bool,
63}
64
65#[derive(Serialize)]
66struct SlotEntry {
67 slot_id: u32,
68 path: String,
69 age_secs: u64,
70 pid_hint: Option<u32>,
71}
72
73#[derive(Serialize)]
74struct SlotsStatusOutput {
75 action: &'static str,
76 max_concurrency: u32,
77 active: usize,
78 free: usize,
79 slots: Vec<SlotEntry>,
80 elapsed_ms: u64,
81}
82
83pub fn run(args: SlotsArgs) -> Result<(), AppError> {
84 run_cmd(args.cmd)
85}
86
87fn run_cmd(cmd: SlotsCmd) -> Result<(), AppError> {
88 match cmd {
89 SlotsCmd::Status(args) => run_status(args),
90 SlotsCmd::Release {
91 slot_id,
92 yes,
93 json: _,
94 } => run_release(slot_id, yes),
95 SlotsCmd::Cleanup {
96 stale_after,
97 yes,
98 dry_run,
99 } => run_cleanup(stale_after, yes, dry_run),
100 }
101}
102
103fn run_status(args: SlotsStatusArgs) -> Result<(), AppError> {
104 let start = std::time::Instant::now();
105 let max = crate::llm_slots::default_max_concurrency();
106 let dir = slots_dir();
107 let mut entries: Vec<SlotEntry> = Vec::new();
108
109 if dir.is_dir() {
110 for slot_id in 0..max {
111 let path = slot_path(slot_id);
112 if path.is_file() {
113 let age_secs = path
114 .metadata()
115 .and_then(|m| m.modified())
116 .ok()
117 .and_then(|t| t.elapsed().ok())
118 .map(|d| d.as_secs())
119 .unwrap_or(0);
120 let pid_hint = std::fs::read_to_string(&path)
121 .ok()
122 .and_then(|s| s.trim().parse::<u32>().ok());
123 entries.push(SlotEntry {
124 slot_id,
125 path: path.to_string_lossy().into_owned(),
126 age_secs,
127 pid_hint,
128 });
129 }
130 }
131 }
132
133 let output = SlotsStatusOutput {
134 action: "slots_status",
135 max_concurrency: max,
136 active: entries.len(),
137 free: (max as usize).saturating_sub(entries.len()),
138 slots: entries,
139 elapsed_ms: start.elapsed().as_millis() as u64,
140 };
141
142 if matches!(args.format, OutputFormat::Json) {
143 let json = serde_json::to_string_pretty(&output).map_err(AppError::Json)?;
144 println!("{json}");
146 } else {
147 tracing::info!(target: "slots", max_concurrency = output.max_concurrency, "slot status");
152 tracing::info!(
153 target: "slots",
154 active = output.active,
155 free = output.free,
156 "slot occupancy"
157 );
158 for s in &output.slots {
159 let pid = s.pid_hint.map(|p| p.to_string()).unwrap_or_default();
160 tracing::info!(
161 target: "slots",
162 slot_id = s.slot_id,
163 age_secs = s.age_secs,
164 pid = %pid,
165 path = %s.path,
166 "slot entry"
167 );
168 }
169 }
170 Ok(())
171}
172
173fn run_release(slot_id: u32, yes: bool) -> Result<(), AppError> {
174 let path = slot_path(slot_id);
175 if !path.is_file() {
176 return Err(AppError::NotFound(format!(
177 "slot {slot_id} is not held (no file at {})",
178 path.display()
179 )));
180 }
181 if !yes {
182 return Err(AppError::Validation(format!(
183 "refusing to release slot {slot_id} without --yes (file: {})",
184 path.display()
185 )));
186 }
187 std::fs::remove_file(&path).map_err(AppError::Io)?;
188 let out = serde_json::json!({
189 "action": "slot_released",
190 "slot_id": slot_id,
191 "path": path.to_string_lossy(),
192 });
193 let _ = emit_json_compact(&out);
194 Ok(())
195}
196
197fn run_cleanup(stale_after: u64, yes: bool, dry_run: bool) -> Result<(), AppError> {
198 let start = std::time::Instant::now();
199 let max = crate::llm_slots::default_max_concurrency();
200 let mut removed: Vec<u32> = Vec::new();
201 for slot_id in 0..max {
202 let path = slot_path(slot_id);
203 if !path.is_file() {
204 continue;
205 }
206 let age = path
207 .metadata()
208 .and_then(|m| m.modified())
209 .ok()
210 .and_then(|t| t.elapsed().ok())
211 .map(|d| d.as_secs())
212 .unwrap_or(0);
213 if age >= stale_after {
214 if !dry_run {
215 if let Err(e) = std::fs::remove_file(&path) {
216 tracing::warn!(target: "slots", slot_id, error = %e, "stale slot removal failed");
217 continue;
218 }
219 }
220 removed.push(slot_id);
221 }
222 }
223 let out = serde_json::json!({
224 "action": if dry_run { "slots_cleanup_dry_run" } else { "slots_cleanup" },
225 "stale_after_secs": stale_after,
226 "removed": removed,
227 "removed_count": removed.len(),
228 "elapsed_ms": start.elapsed().as_millis() as u64,
229 "yes": yes,
230 });
231 let _ = emit_json_compact(&out);
232 Ok(())
233}
234
235#[cfg(test)]
239mod tests {
240 use super::*;
241 use crate::llm_slots::acquire_llm_slot;
242
243 #[test]
244 fn acquire_then_drop_releases_slot() {
245 let _ = std::fs::remove_dir_all(crate::llm_slots::slots_dir());
246 let guard = acquire_llm_slot(2, 5).expect("acquire");
247 let path = slot_path(guard.slot_id());
248 assert!(path.is_file(), "slot file must exist after acquire");
249 drop(guard);
250 assert!(
251 !path.is_file(),
252 "slot file must be removed after Drop (RAII guarantee)"
253 );
254 }
255}