leann_core/embedding/
manager.rs1use anyhow::Result;
2use std::net::TcpListener;
3use std::process::{Child, Command};
4use std::time::{Duration, Instant};
5use tracing::{info, warn};
6
7pub struct EmbeddingServerManager {
9 process: Option<Child>,
11 port: Option<u16>,
13 config_signature: Option<ConfigSignature>,
15}
16
17#[derive(Debug, Clone, PartialEq)]
18struct ConfigSignature {
19 model_name: String,
20 embedding_mode: String,
21 passages_file: String,
22}
23
24impl EmbeddingServerManager {
25 pub fn new() -> Self {
26 Self {
27 process: None,
28 port: None,
29 config_signature: None,
30 }
31 }
32
33 pub fn start_server(
35 &mut self,
36 port: u16,
37 model_name: &str,
38 embedding_mode: &str,
39 passages_file: &str,
40 leann_binary: &str,
41 ) -> Result<u16> {
42 let new_sig = ConfigSignature {
43 model_name: model_name.to_string(),
44 embedding_mode: embedding_mode.to_string(),
45 passages_file: passages_file.to_string(),
46 };
47
48 if let (Some(process), Some(port), Some(sig)) =
50 (&mut self.process, self.port, &self.config_signature)
51 {
52 if sig == &new_sig {
53 match process.try_wait() {
54 Ok(None) => {
55 info!("Reusing existing embedding server on port {}", port);
56 return Ok(port);
57 }
58 _ => {
59 }
61 }
62 } else {
63 self.stop_server();
65 }
66 }
67
68 let actual_port = find_available_port(port)?;
70
71 let mut cmd = Command::new(leann_binary);
73 cmd.arg("serve-embeddings")
74 .arg("--port")
75 .arg(actual_port.to_string())
76 .arg("--model-name")
77 .arg(model_name);
78
79 if !passages_file.is_empty() {
80 cmd.arg("--passages-file").arg(passages_file);
81 }
82 if embedding_mode != "sentence-transformers" {
83 cmd.arg("--embedding-mode").arg(embedding_mode);
84 }
85
86 info!("Starting embedding server on port {}", actual_port);
87 let child = cmd
88 .spawn()
89 .map_err(|e| anyhow::anyhow!("Failed to start embedding server: {}", e))?;
90
91 self.process = Some(child);
92 self.port = Some(actual_port);
93 self.config_signature = Some(new_sig);
94
95 self.wait_for_ready(actual_port, Duration::from_secs(120))?;
97
98 Ok(actual_port)
99 }
100
101 pub fn stop_server(&mut self) {
103 if let Some(mut process) = self.process.take() {
104 info!("Terminating embedding server process");
105
106 #[cfg(unix)]
108 {
109 unsafe {
110 libc::kill(process.id() as i32, libc::SIGTERM);
111 }
112 }
113 #[cfg(not(unix))]
114 {
115 let _ = process.kill();
116 }
117
118 match wait_with_timeout(&mut process, Duration::from_secs(5)) {
120 Ok(_) => info!("Server process terminated gracefully"),
121 Err(_) => {
122 warn!("Server did not terminate in time, force killing");
123 let _ = process.kill();
124 let _ = process.wait();
125 }
126 }
127 }
128
129 self.port = None;
130 self.config_signature = None;
131 }
132
133 pub fn port(&self) -> Option<u16> {
135 self.port
136 }
137
138 pub fn is_alive(&mut self) -> bool {
140 if let Some(ref mut process) = self.process {
141 matches!(process.try_wait(), Ok(None))
142 } else {
143 false
144 }
145 }
146
147 fn wait_for_ready(&self, port: u16, timeout: Duration) -> Result<()> {
148 let start = Instant::now();
149 let check_interval = Duration::from_millis(500);
150
151 while start.elapsed() < timeout {
152 if is_port_in_use(port) {
153 info!("Embedding server ready on port {}", port);
154 return Ok(());
155 }
156
157 std::thread::sleep(check_interval);
161 }
162
163 anyhow::bail!(
164 "Embedding server failed to start within {} seconds",
165 timeout.as_secs()
166 )
167 }
168}
169
170impl Default for EmbeddingServerManager {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176impl Drop for EmbeddingServerManager {
177 fn drop(&mut self) {
178 self.stop_server();
179 }
180}
181
182fn find_available_port(start: u16) -> Result<u16> {
183 for port in start..start + 100 {
184 if TcpListener::bind(("localhost", port)).is_ok() {
185 return Ok(port);
186 }
187 }
188 anyhow::bail!(
189 "No available ports found in range {}-{}",
190 start,
191 start + 100
192 )
193}
194
195fn is_port_in_use(port: u16) -> bool {
196 std::net::TcpStream::connect(("localhost", port)).is_ok()
197}
198
199fn wait_with_timeout(child: &mut Child, timeout: Duration) -> Result<()> {
200 let start = Instant::now();
201 loop {
202 match child.try_wait()? {
203 Some(_) => return Ok(()),
204 None => {
205 if start.elapsed() > timeout {
206 anyhow::bail!("Timeout waiting for process");
207 }
208 std::thread::sleep(Duration::from_millis(100));
209 }
210 }
211 }
212}