Skip to main content

leann_core/embedding/
manager.rs

1use anyhow::Result;
2use std::net::TcpListener;
3use std::process::{Child, Command};
4use std::time::{Duration, Instant};
5use tracing::{info, warn};
6
7/// Manages embedding server subprocess lifecycle.
8pub struct EmbeddingServerManager {
9    /// The running server process, if any.
10    process: Option<Child>,
11    /// Port the server is listening on.
12    port: Option<u16>,
13    /// Configuration signature for reuse detection.
14    config_signature: Option<ConfigSignature>,
15}
16
17#[derive(Debug, Clone, PartialEq)]
18struct ConfigSignature {
19    model_name: String,
20    embedding_mode: String,
21    passages_file: String,
22}
23
24impl EmbeddingServerManager {
25    pub fn new() -> Self {
26        Self {
27            process: None,
28            port: None,
29            config_signature: None,
30        }
31    }
32
33    /// Start the embedding server, reusing an existing one if the config matches.
34    pub fn start_server(
35        &mut self,
36        port: u16,
37        model_name: &str,
38        embedding_mode: &str,
39        passages_file: &str,
40        leann_binary: &str,
41    ) -> Result<u16> {
42        let new_sig = ConfigSignature {
43            model_name: model_name.to_string(),
44            embedding_mode: embedding_mode.to_string(),
45            passages_file: passages_file.to_string(),
46        };
47
48        // Reuse existing server if config matches and process is alive
49        if let (Some(process), Some(port), Some(sig)) =
50            (&mut self.process, self.port, &self.config_signature)
51        {
52            if sig == &new_sig {
53                match process.try_wait() {
54                    Ok(None) => {
55                        info!("Reusing existing embedding server on port {}", port);
56                        return Ok(port);
57                    }
58                    _ => {
59                        // Process died, need to restart
60                    }
61                }
62            } else {
63                // Config changed, stop existing server
64                self.stop_server();
65            }
66        }
67
68        // Find available port
69        let actual_port = find_available_port(port)?;
70
71        // Build command
72        let mut cmd = Command::new(leann_binary);
73        cmd.arg("serve-embeddings")
74            .arg("--port")
75            .arg(actual_port.to_string())
76            .arg("--model-name")
77            .arg(model_name);
78
79        if !passages_file.is_empty() {
80            cmd.arg("--passages-file").arg(passages_file);
81        }
82        if embedding_mode != "sentence-transformers" {
83            cmd.arg("--embedding-mode").arg(embedding_mode);
84        }
85
86        info!("Starting embedding server on port {}", actual_port);
87        let child = cmd
88            .spawn()
89            .map_err(|e| anyhow::anyhow!("Failed to start embedding server: {}", e))?;
90
91        self.process = Some(child);
92        self.port = Some(actual_port);
93        self.config_signature = Some(new_sig);
94
95        // Wait for server to be ready
96        self.wait_for_ready(actual_port, Duration::from_secs(120))?;
97
98        Ok(actual_port)
99    }
100
101    /// Stop the embedding server if running.
102    pub fn stop_server(&mut self) {
103        if let Some(mut process) = self.process.take() {
104            info!("Terminating embedding server process");
105
106            // Try graceful shutdown first
107            #[cfg(unix)]
108            {
109                unsafe {
110                    libc::kill(process.id() as i32, libc::SIGTERM);
111                }
112            }
113            #[cfg(not(unix))]
114            {
115                let _ = process.kill();
116            }
117
118            // Wait with timeout
119            match wait_with_timeout(&mut process, Duration::from_secs(5)) {
120                Ok(_) => info!("Server process terminated gracefully"),
121                Err(_) => {
122                    warn!("Server did not terminate in time, force killing");
123                    let _ = process.kill();
124                    let _ = process.wait();
125                }
126            }
127        }
128
129        self.port = None;
130        self.config_signature = None;
131    }
132
133    /// Get the port the server is listening on, if running.
134    pub fn port(&self) -> Option<u16> {
135        self.port
136    }
137
138    /// Check if the server process is still alive.
139    pub fn is_alive(&mut self) -> bool {
140        if let Some(ref mut process) = self.process {
141            matches!(process.try_wait(), Ok(None))
142        } else {
143            false
144        }
145    }
146
147    fn wait_for_ready(&self, port: u16, timeout: Duration) -> Result<()> {
148        let start = Instant::now();
149        let check_interval = Duration::from_millis(500);
150
151        while start.elapsed() < timeout {
152            if is_port_in_use(port) {
153                info!("Embedding server ready on port {}", port);
154                return Ok(());
155            }
156
157            // Check if process died
158            // We can't call try_wait on an immutable reference, just sleep and check port
159
160            std::thread::sleep(check_interval);
161        }
162
163        anyhow::bail!(
164            "Embedding server failed to start within {} seconds",
165            timeout.as_secs()
166        )
167    }
168}
169
170impl Default for EmbeddingServerManager {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176impl Drop for EmbeddingServerManager {
177    fn drop(&mut self) {
178        self.stop_server();
179    }
180}
181
182fn find_available_port(start: u16) -> Result<u16> {
183    for port in start..start + 100 {
184        if TcpListener::bind(("localhost", port)).is_ok() {
185            return Ok(port);
186        }
187    }
188    anyhow::bail!(
189        "No available ports found in range {}-{}",
190        start,
191        start + 100
192    )
193}
194
195fn is_port_in_use(port: u16) -> bool {
196    std::net::TcpStream::connect(("localhost", port)).is_ok()
197}
198
199fn wait_with_timeout(child: &mut Child, timeout: Duration) -> Result<()> {
200    let start = Instant::now();
201    loop {
202        match child.try_wait()? {
203            Some(_) => return Ok(()),
204            None => {
205                if start.elapsed() > timeout {
206                    anyhow::bail!("Timeout waiting for process");
207                }
208                std::thread::sleep(Duration::from_millis(100));
209            }
210        }
211    }
212}