Skip to main content

gobby_core/provisioning/
docker.rs

1use super::*;
2
3use std::io::{BufRead as _, Write as _};
4use std::net::TcpStream;
5use std::process::Command;
6use std::time::Duration;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct DockerServiceOptions {
10    pub gobby_home: PathBuf,
11    pub postgres_port: u16,
12    pub qdrant_http_port: u16,
13    pub qdrant_grpc_port: u16,
14    pub falkordb_host: String,
15    pub falkordb_port: u16,
16    pub falkordb_browser_port: u16,
17    pub falkordb_password: String,
18}
19
20impl DockerServiceOptions {
21    pub fn new(gobby_home: PathBuf) -> Self {
22        Self {
23            gobby_home,
24            postgres_port: DEFAULT_POSTGRES_PORT,
25            qdrant_http_port: DEFAULT_QDRANT_HTTP_PORT,
26            qdrant_grpc_port: DEFAULT_QDRANT_GRPC_PORT,
27            falkordb_host: DEFAULT_FALKORDB_HOST.to_string(),
28            falkordb_port: DEFAULT_FALKORDB_PORT,
29            falkordb_browser_port: DEFAULT_FALKORDB_BROWSER_PORT,
30            falkordb_password: DEFAULT_FALKORDB_PASSWORD.to_string(),
31        }
32    }
33
34    pub fn database_url(&self) -> String {
35        default_database_url(self.postgres_port)
36    }
37
38    pub fn qdrant_url(&self) -> String {
39        format!("http://localhost:{}", self.qdrant_http_port)
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct ServiceAssetReport {
45    pub services_dir: PathBuf,
46    pub compose_file: PathBuf,
47    pub env_file: PathBuf,
48    pub postgres_asset_dir: PathBuf,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct DockerProvisioningReport {
53    pub services_dir: PathBuf,
54    pub compose_file: PathBuf,
55    pub env_file: PathBuf,
56    pub started_profiles: Vec<String>,
57    pub health_checks: Vec<String>,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct CommandSpec {
62    pub program: String,
63    pub args: Vec<String>,
64    pub env: BTreeMap<String, String>,
65    pub cwd: Option<PathBuf>,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct CommandOutput {
70    pub status: i32,
71    pub stdout: String,
72    pub stderr: String,
73}
74
75pub trait CommandRunner {
76    fn run(&mut self, spec: &CommandSpec) -> std::io::Result<CommandOutput>;
77}
78
79pub struct RealCommandRunner;
80
81impl CommandRunner for RealCommandRunner {
82    fn run(&mut self, spec: &CommandSpec) -> std::io::Result<CommandOutput> {
83        let mut command = Command::new(&spec.program);
84        command.args(&spec.args);
85        if let Some(cwd) = &spec.cwd {
86            command.current_dir(cwd);
87        }
88        for (key, value) in &spec.env {
89            command.env(key, value);
90        }
91        let output = command.output()?;
92        Ok(CommandOutput {
93            status: output.status.code().unwrap_or(1),
94            stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
95            stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
96        })
97    }
98}
99
100pub trait DockerHealthChecker {
101    fn wait_postgres(&mut self, host: &str, port: u16) -> anyhow::Result<()>;
102    fn wait_qdrant(&mut self, host: &str, port: u16) -> anyhow::Result<()>;
103    fn wait_falkordb(&mut self, host: &str, port: u16) -> anyhow::Result<()>;
104}
105
106pub struct TcpDockerHealthChecker {
107    pub retries: usize,
108    pub interval: Duration,
109}
110
111impl Default for TcpDockerHealthChecker {
112    fn default() -> Self {
113        Self {
114            retries: 30,
115            interval: Duration::from_secs(2),
116        }
117    }
118}
119
120impl DockerHealthChecker for TcpDockerHealthChecker {
121    fn wait_postgres(&mut self, host: &str, port: u16) -> anyhow::Result<()> {
122        wait_for_tcp(host, port, self.retries, self.interval)
123            .map_err(|err| anyhow::anyhow!("PostgreSQL did not become reachable: {err}"))
124    }
125
126    fn wait_qdrant(&mut self, host: &str, port: u16) -> anyhow::Result<()> {
127        let healthz = || -> anyhow::Result<()> {
128            let mut stream = TcpStream::connect((host, port))?;
129            stream.set_read_timeout(Some(Duration::from_secs(3)))?;
130            stream.set_write_timeout(Some(Duration::from_secs(3)))?;
131            stream.write_all(b"GET /healthz HTTP/1.0\r\nHost: localhost\r\n\r\n")?;
132            let mut status_line = String::new();
133            std::io::BufReader::new(stream).read_line(&mut status_line)?;
134            if status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200") {
135                Ok(())
136            } else {
137                anyhow::bail!("unexpected Qdrant health response")
138            }
139        };
140        wait_for(healthz, self.retries, self.interval)
141            .map_err(|err| anyhow::anyhow!("Qdrant did not become healthy: {err}"))
142    }
143
144    fn wait_falkordb(&mut self, host: &str, port: u16) -> anyhow::Result<()> {
145        wait_for_tcp(host, port, self.retries, self.interval)
146            .map_err(|err| anyhow::anyhow!("FalkorDB did not become reachable: {err}"))
147    }
148}
149
150pub fn provision_docker_services(
151    options: &DockerServiceOptions,
152) -> anyhow::Result<DockerProvisioningReport> {
153    let mut runner = RealCommandRunner;
154    let mut health = TcpDockerHealthChecker::default();
155    provision_docker_services_with(options, &mut runner, &mut health)
156}
157
158pub fn provision_docker_services_with(
159    options: &DockerServiceOptions,
160    runner: &mut impl CommandRunner,
161    health: &mut impl DockerHealthChecker,
162) -> anyhow::Result<DockerProvisioningReport> {
163    let assets = prepare_service_assets(options)?;
164    let spec = docker_compose_up_spec(options, &assets.compose_file, &assets.services_dir);
165    let output = runner.run(&spec).map_err(|err| {
166        anyhow::anyhow!("failed to execute docker compose for standalone services: {err}")
167    })?;
168    if output.status != 0 {
169        anyhow::bail!(
170            "docker compose up failed: {}",
171            first_non_empty(&output.stderr, &output.stdout)
172        );
173    }
174
175    health.wait_postgres(DEFAULT_POSTGRES_HOST, options.postgres_port)?;
176    health.wait_qdrant(DEFAULT_QDRANT_HOST, options.qdrant_http_port)?;
177    health.wait_falkordb(&options.falkordb_host, options.falkordb_port)?;
178
179    Ok(DockerProvisioningReport {
180        services_dir: assets.services_dir,
181        compose_file: assets.compose_file,
182        env_file: assets.env_file,
183        started_profiles: vec!["all".to_string()],
184        health_checks: vec![
185            "postgres".to_string(),
186            "qdrant".to_string(),
187            "falkordb".to_string(),
188        ],
189    })
190}
191
192pub fn prepare_service_assets(
193    options: &DockerServiceOptions,
194) -> anyhow::Result<ServiceAssetReport> {
195    let services = services_dir(&options.gobby_home);
196    let compose = services.join(COMPOSE_FILENAME);
197    let pgsearch = services.join("postgres-pgsearch");
198    let env_file = services.join(".env");
199
200    fs::create_dir_all(pgsearch.join("initdb.d"))?;
201    fs::create_dir_all(pgsearch.join("scripts"))?;
202    fs::write(&compose, COMPOSE_TEMPLATE)?;
203    fs::write(pgsearch.join("Dockerfile"), PGSEARCH_DOCKERFILE)?;
204    fs::write(pgsearch.join("version.json"), PGSEARCH_VERSION)?;
205    fs::write(
206        pgsearch.join("initdb.d").join("01-pg_search.sql"),
207        PGSEARCH_INIT_PG_SEARCH,
208    )?;
209    fs::write(
210        pgsearch.join("initdb.d").join("02-pgaudit.sql"),
211        PGSEARCH_INIT_PGAUDIT,
212    )?;
213    let audit_script = pgsearch.join("scripts").join("pg_audit_export.sh");
214    fs::write(&audit_script, PG_AUDIT_EXPORT)?;
215    make_executable(&audit_script)?;
216
217    let manifest = pgsearch_manifest()?;
218    update_env_file(
219        &env_file,
220        BTreeMap::from([
221            (
222                "GOBBY_PG_SEARCH_VERSION".to_string(),
223                manifest.pg_search_version,
224            ),
225            ("GOBBY_PG_SEARCH_SHA256".to_string(), manifest.sha256),
226            (
227                "GOBBY_POSTGRES_PORT".to_string(),
228                options.postgres_port.to_string(),
229            ),
230            (
231                "GOBBY_POSTGRES_DB".to_string(),
232                DEFAULT_POSTGRES_DB.to_string(),
233            ),
234            (
235                "GOBBY_POSTGRES_USER".to_string(),
236                DEFAULT_POSTGRES_USER.to_string(),
237            ),
238            (
239                "GOBBY_POSTGRES_PASSWORD".to_string(),
240                DEFAULT_POSTGRES_PASSWORD.to_string(),
241            ),
242            (
243                "GOBBY_QDRANT_HTTP_PORT".to_string(),
244                options.qdrant_http_port.to_string(),
245            ),
246            (
247                "GOBBY_QDRANT_GRPC_PORT".to_string(),
248                options.qdrant_grpc_port.to_string(),
249            ),
250            (
251                "GOBBY_FALKORDB_PORT".to_string(),
252                options.falkordb_port.to_string(),
253            ),
254            (
255                "GOBBY_FALKORDB_BROWSER_PORT".to_string(),
256                options.falkordb_browser_port.to_string(),
257            ),
258            (
259                "GOBBY_FALKORDB_PASSWORD".to_string(),
260                options.falkordb_password.clone(),
261            ),
262        ]),
263    )?;
264
265    Ok(ServiceAssetReport {
266        services_dir: services,
267        compose_file: compose,
268        env_file,
269        postgres_asset_dir: pgsearch,
270    })
271}
272
273pub fn docker_compose_up_spec(
274    options: &DockerServiceOptions,
275    compose_file: &Path,
276    services_dir: &Path,
277) -> CommandSpec {
278    CommandSpec {
279        program: "docker".to_string(),
280        args: vec![
281            "compose".to_string(),
282            "-f".to_string(),
283            compose_file.display().to_string(),
284            "--profile".to_string(),
285            "all".to_string(),
286            "up".to_string(),
287            "-d".to_string(),
288            "--remove-orphans".to_string(),
289        ],
290        env: BTreeMap::from([
291            (
292                "GOBBY_FALKORDB_PASSWORD".to_string(),
293                options.falkordb_password.clone(),
294            ),
295            (
296                "GOBBY_POSTGRES_PORT".to_string(),
297                options.postgres_port.to_string(),
298            ),
299            (
300                "GOBBY_QDRANT_HTTP_PORT".to_string(),
301                options.qdrant_http_port.to_string(),
302            ),
303        ]),
304        cwd: Some(services_dir.to_path_buf()),
305    }
306}
307
308#[derive(Debug, Deserialize)]
309struct PgSearchVersionFile {
310    pg_search_version: String,
311    pg_search_sha256: String,
312    pg_search_sha256_by_arch: Option<BTreeMap<String, String>>,
313}
314
315struct PgSearchManifest {
316    pg_search_version: String,
317    sha256: String,
318}
319
320fn pgsearch_manifest() -> anyhow::Result<PgSearchManifest> {
321    let parsed: PgSearchVersionFile = serde_json::from_str(PGSEARCH_VERSION)?;
322    let arch = debian_arch(std::env::consts::ARCH);
323    let sha256 = parsed
324        .pg_search_sha256_by_arch
325        .and_then(|by_arch| by_arch.get(&arch).cloned())
326        .unwrap_or(parsed.pg_search_sha256);
327    Ok(PgSearchManifest {
328        pg_search_version: parsed.pg_search_version,
329        sha256,
330    })
331}
332
333fn debian_arch(arch: &str) -> String {
334    match arch {
335        "x86_64" | "amd64" => "amd64".to_string(),
336        "aarch64" | "arm64" => "arm64".to_string(),
337        other => other.to_string(),
338    }
339}
340
341fn update_env_file(path: &Path, updates: BTreeMap<String, String>) -> anyhow::Result<()> {
342    if let Some(parent) = path.parent() {
343        fs::create_dir_all(parent)?;
344    }
345    let mut lines = Vec::new();
346    if path.exists() {
347        for line in fs::read_to_string(path)?.lines() {
348            let key = line.split_once('=').map(|(key, _)| key).unwrap_or(line);
349            if !updates.contains_key(key) {
350                lines.push(line.to_string());
351            }
352        }
353        if lines.last().is_some_and(|line| !line.trim().is_empty()) {
354            lines.push(String::new());
355        }
356    }
357    for (key, value) in updates {
358        lines.push(format!("{key}={value}"));
359    }
360    fs::write(path, format!("{}\n", lines.join("\n")))?;
361    Ok(())
362}
363
364fn first_non_empty<'a>(first: &'a str, second: &'a str) -> &'a str {
365    if first.trim().is_empty() {
366        second.trim()
367    } else {
368        first.trim()
369    }
370}
371
372fn wait_for_tcp(host: &str, port: u16, retries: usize, interval: Duration) -> anyhow::Result<()> {
373    wait_for(
374        || {
375            TcpStream::connect((host, port))
376                .map(|_| ())
377                .map_err(Into::into)
378        },
379        retries,
380        interval,
381    )
382}
383
384fn wait_for(
385    mut check: impl FnMut() -> anyhow::Result<()>,
386    retries: usize,
387    interval: Duration,
388) -> anyhow::Result<()> {
389    if retries == 0 {
390        anyhow::bail!("health check failed without retries");
391    }
392    let mut last_error = None;
393    for attempt in 0..retries {
394        match check() {
395            Ok(()) => return Ok(()),
396            Err(err) => last_error = Some(err),
397        }
398        if attempt + 1 < retries {
399            std::thread::sleep(interval);
400        }
401    }
402    Err(last_error.unwrap_or_else(|| anyhow::anyhow!("health check failed")))
403}
404
405fn make_executable(path: &Path) -> anyhow::Result<()> {
406    #[cfg(unix)]
407    {
408        use std::os::unix::fs::PermissionsExt;
409        let mut permissions = fs::metadata(path)?.permissions();
410        permissions.set_mode(0o755);
411        fs::set_permissions(path, permissions)?;
412    }
413    #[cfg(not(unix))]
414    {
415        let _ = path;
416    }
417    Ok(())
418}