Skip to main content

eure_ls/native/
io_pool.rs

1//! IO thread pool for file reading.
2
3use std::fs;
4use std::thread::{self, JoinHandle};
5
6#[cfg(feature = "http")]
7use anyhow::anyhow;
8use crossbeam_channel::{Receiver, Sender, unbounded};
9#[cfg(feature = "http")]
10use eure::query::fetch_url;
11use eure::query::{TextFile, TextFileContent};
12
13/// Request to read a file from disk.
14pub struct IoRequest {
15    pub file: TextFile,
16}
17
18/// Response from reading a file.
19pub struct IoResponse {
20    pub file: TextFile,
21    /// Content or error from fetching the file.
22    pub result: Result<TextFileContent, anyhow::Error>,
23}
24
25/// Thread pool for handling file I/O operations.
26///
27/// This allows the main event loop to remain responsive while
28/// files are being read from disk.
29pub struct IoPool {
30    /// Channel to send file read requests to workers.
31    request_tx: Sender<IoRequest>,
32    /// Channel to receive file read responses from workers.
33    response_rx: Receiver<IoResponse>,
34    /// Worker thread handles (kept for cleanup).
35    _workers: Vec<JoinHandle<()>>,
36}
37
38impl IoPool {
39    /// Create a new IO pool with the specified number of worker threads.
40    pub fn new(num_workers: usize) -> Self {
41        let (request_tx, request_rx) = unbounded::<IoRequest>();
42        let (response_tx, response_rx) = unbounded::<IoResponse>();
43
44        let mut workers = Vec::with_capacity(num_workers);
45
46        for i in 0..num_workers {
47            let request_rx = request_rx.clone();
48            let response_tx = response_tx.clone();
49
50            let handle = thread::Builder::new()
51                .name(format!("eure-ls-io-{}", i))
52                .spawn(move || {
53                    worker_loop(request_rx, response_tx);
54                })
55                .expect("failed to spawn IO worker thread");
56
57            workers.push(handle);
58        }
59
60        Self {
61            request_tx,
62            response_rx,
63            _workers: workers,
64        }
65    }
66
67    /// Request a file to be read.
68    ///
69    /// The result will be available via `receiver()`.
70    pub fn request_file(&self, file: TextFile) {
71        // Ignore send errors - they only happen if all workers have died
72        let _ = self.request_tx.send(IoRequest { file });
73    }
74
75    /// Get the receiver for file read responses.
76    ///
77    /// Use this with `crossbeam_channel::select!` to wait for responses.
78    pub fn receiver(&self) -> &Receiver<IoResponse> {
79        &self.response_rx
80    }
81}
82
83/// Worker loop that reads files from disk.
84fn worker_loop(request_rx: Receiver<IoRequest>, response_tx: Sender<IoResponse>) {
85    for request in request_rx {
86        let result = read_file(&request.file);
87        let response = IoResponse {
88            file: request.file,
89            result,
90        };
91
92        // If the main thread has stopped listening, just exit
93        if response_tx.send(response).is_err() {
94            break;
95        }
96    }
97}
98
99/// Read a file from disk or fetch from URL and return its content.
100fn read_file(file: &TextFile) -> Result<TextFileContent, anyhow::Error> {
101    match file {
102        TextFile::Local(path) => Ok(TextFileContent(fs::read_to_string(path.as_ref())?)),
103        #[cfg(feature = "http")]
104        TextFile::Remote(url) => match fetch_url(url) {
105            Ok(content) => Ok(content),
106            Err(e) => Err(anyhow!("Failed to fetch {}: {}", url, e)),
107        },
108        #[cfg(not(feature = "http"))]
109        TextFile::Remote(url) => Err(anyhow::anyhow!(
110            "HTTP support not enabled, cannot fetch {}",
111            url
112        )),
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use std::path::PathBuf;
120
121    #[test]
122    fn test_read_nonexistent_file() {
123        let file = TextFile::from_path(PathBuf::from("/nonexistent/path/to/file.eure"));
124        let result = read_file(&file);
125        assert!(
126            result
127                .unwrap_err()
128                .downcast_ref::<std::io::Error>()
129                .is_some()
130        );
131    }
132}