Skip to main content

sqlite_graphrag/
lock.rs

1//! Semáforo de contagem via lock files para limitar invocações paralelas do CLI.
2//!
3//! `acquire_cli_slot` tenta adquirir um dos `N` slots disponíveis abrindo o arquivo
4//! `cli-slot-{N}.lock` no diretório de cache do SO e obtendo um `flock` exclusivo.
5//! O [`std::fs::File`] retornado DEVE ser mantido vivo durante toda a execução de
6//! `main`; descartá-lo libera o slot automaticamente para a próxima invocação.
7//!
8//! Quando `wait_seconds` é `Some(n) > 0`, a função faz polling a cada
9//! [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] milissegundos até o deadline. Quando é `None`
10//! ou `Some(0)`, uma única tentativa é feita e `Err(AppError::AllSlotsFull)` é
11//! retornado imediatamente se todos os slots estiverem ocupados.
12
13use std::fs::{File, OpenOptions};
14use std::path::PathBuf;
15use std::thread;
16use std::time::{Duration, Instant};
17
18use directories::ProjectDirs;
19use fs4::fs_std::FileExt;
20
21use crate::constants::{CLI_LOCK_POLL_INTERVAL_MS, MAX_CONCURRENT_CLI_INSTANCES};
22use crate::errors::AppError;
23
24/// Retorna o caminho do arquivo de lock para o slot indicado.
25///
26/// Honra `SQLITE_GRAPHRAG_CACHE_DIR` quando definida (útil para testes, containers
27/// e caches em NFS), caindo para o diretório de cache padrão do SO via
28/// `directories::ProjectDirs`. O slot deve ser 1-based.
29fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
30    let cache = if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
31        PathBuf::from(override_dir)
32    } else {
33        let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
34            AppError::Io(std::io::Error::new(
35                std::io::ErrorKind::NotFound,
36                "não foi possível determinar o diretório de cache para os lock files do sqlite-graphrag",
37            ))
38        })?;
39        dirs.cache_dir().to_path_buf()
40    };
41    std::fs::create_dir_all(&cache)?;
42    Ok(cache.join(format!("cli-slot-{slot}.lock")))
43}
44
45/// Tenta abrir e travar exclusivamente o arquivo de lock do slot indicado.
46///
47/// Retorna `Ok(file)` se o slot estiver livre, ou `Err(io::Error)` se estiver
48/// ocupado por outra instância (sem bloquear).
49fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
50    let path = slot_path(slot)?;
51    let file = OpenOptions::new()
52        .read(true)
53        .write(true)
54        .create(true)
55        .truncate(false)
56        .open(&path)?;
57    file.try_lock_exclusive().map_err(AppError::Io)?;
58    Ok(file)
59}
60
61/// Adquire um slot de concorrência no semáforo de `max_concurrency` posições.
62///
63/// Itera os slots `1..=max_concurrency` tentando `try_lock_exclusive` em cada
64/// arquivo `cli-slot-N.lock`. Quando encontra um slot livre, retorna
65/// `(File, slot_number)`. Se todos os slots estiverem ocupados:
66///
67/// - Se `wait_seconds` for `None` ou `Some(0)`, retorna imediatamente com
68///   `AppError::AllSlotsFull { max, waited_secs: 0 }`.
69/// - Se `wait_seconds` for `Some(n) > 0`, entra em loop de polling a cada
70///   [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] ms até o deadline expirar, retornando
71///   `AppError::AllSlotsFull { max, waited_secs: n }` se nenhum slot abrir.
72///
73/// O `File` retornado DEVE ser mantido vivo até o processo encerrar; descartá-lo
74/// libera o slot automaticamente via `flock` implícito no fechamento.
75pub fn acquire_cli_slot(
76    max_concurrency: usize,
77    wait_seconds: Option<u64>,
78) -> Result<(File, usize), AppError> {
79    let max = max_concurrency.clamp(1, MAX_CONCURRENT_CLI_INSTANCES);
80    let wait_secs = wait_seconds.unwrap_or(0);
81
82    // Tentativa inicial sem espera.
83    if let Some((file, slot)) = try_any_slot(max)? {
84        return Ok((file, slot));
85    }
86
87    if wait_secs == 0 {
88        return Err(AppError::AllSlotsFull {
89            max,
90            waited_secs: 0,
91        });
92    }
93
94    // Loop de polling até o deadline.
95    let deadline = Instant::now() + Duration::from_secs(wait_secs);
96    loop {
97        thread::sleep(Duration::from_millis(CLI_LOCK_POLL_INTERVAL_MS));
98        if let Some((file, slot)) = try_any_slot(max)? {
99            return Ok((file, slot));
100        }
101        if Instant::now() >= deadline {
102            return Err(AppError::AllSlotsFull {
103                max,
104                waited_secs: wait_secs,
105            });
106        }
107    }
108}
109
110/// Tenta adquirir qualquer slot livre em `1..=max`, retornando o primeiro disponível.
111///
112/// Retorna `Ok(Some((file, slot)))` se um slot foi obtido, `Ok(None)` se todos
113/// estão ocupados (`EWOULDBLOCK`). Propaga erros de I/O distintos de "lock contended".
114fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
115    for slot in 1..=max {
116        match try_acquire_slot(slot) {
117            Ok(file) => return Ok(Some((file, slot))),
118            Err(AppError::Io(e)) if e.kind() == std::io::ErrorKind::WouldBlock => {
119                continue;
120            }
121            Err(e) => return Err(e),
122        }
123    }
124    Ok(None)
125}