1use super::*;
2
3use std::io::{BufRead as _, Write as _};
4use std::net::TcpStream;
5use std::process::Command;
6use std::time::Duration;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct DockerServiceOptions {
10 pub gobby_home: PathBuf,
11 pub postgres_port: u16,
12 pub qdrant_http_port: u16,
13 pub qdrant_grpc_port: u16,
14 pub falkordb_host: String,
15 pub falkordb_port: u16,
16 pub falkordb_browser_port: u16,
17 pub falkordb_password: String,
18}
19
20impl DockerServiceOptions {
21 pub fn new(gobby_home: PathBuf) -> Self {
22 Self {
23 gobby_home,
24 postgres_port: DEFAULT_POSTGRES_PORT,
25 qdrant_http_port: DEFAULT_QDRANT_HTTP_PORT,
26 qdrant_grpc_port: DEFAULT_QDRANT_GRPC_PORT,
27 falkordb_host: DEFAULT_FALKORDB_HOST.to_string(),
28 falkordb_port: DEFAULT_FALKORDB_PORT,
29 falkordb_browser_port: DEFAULT_FALKORDB_BROWSER_PORT,
30 falkordb_password: DEFAULT_FALKORDB_PASSWORD.to_string(),
31 }
32 }
33
34 pub fn database_url(&self) -> String {
35 default_database_url(self.postgres_port)
36 }
37
38 pub fn qdrant_url(&self) -> String {
39 format!("http://localhost:{}", self.qdrant_http_port)
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct ServiceAssetReport {
45 pub services_dir: PathBuf,
46 pub compose_file: PathBuf,
47 pub env_file: PathBuf,
48 pub postgres_asset_dir: PathBuf,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct DockerProvisioningReport {
53 pub services_dir: PathBuf,
54 pub compose_file: PathBuf,
55 pub env_file: PathBuf,
56 pub started_profiles: Vec<String>,
57 pub health_checks: Vec<String>,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct CommandSpec {
62 pub program: String,
63 pub args: Vec<String>,
64 pub env: BTreeMap<String, String>,
65 pub cwd: Option<PathBuf>,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct CommandOutput {
70 pub status: i32,
71 pub stdout: String,
72 pub stderr: String,
73}
74
75pub trait CommandRunner {
76 fn run(&mut self, spec: &CommandSpec) -> std::io::Result<CommandOutput>;
77}
78
79pub struct RealCommandRunner;
80
81impl CommandRunner for RealCommandRunner {
82 fn run(&mut self, spec: &CommandSpec) -> std::io::Result<CommandOutput> {
83 let mut command = Command::new(&spec.program);
84 command.args(&spec.args);
85 if let Some(cwd) = &spec.cwd {
86 command.current_dir(cwd);
87 }
88 for (key, value) in &spec.env {
89 command.env(key, value);
90 }
91 let output = command.output()?;
92 Ok(CommandOutput {
93 status: output.status.code().unwrap_or(1),
94 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
95 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
96 })
97 }
98}
99
100pub trait DockerHealthChecker {
101 fn wait_postgres(&mut self, host: &str, port: u16) -> anyhow::Result<()>;
102 fn wait_qdrant(&mut self, host: &str, port: u16) -> anyhow::Result<()>;
103 fn wait_falkordb(&mut self, host: &str, port: u16) -> anyhow::Result<()>;
104}
105
106pub struct TcpDockerHealthChecker {
107 pub retries: usize,
108 pub interval: Duration,
109}
110
111impl Default for TcpDockerHealthChecker {
112 fn default() -> Self {
113 Self {
114 retries: 30,
115 interval: Duration::from_secs(2),
116 }
117 }
118}
119
120impl DockerHealthChecker for TcpDockerHealthChecker {
121 fn wait_postgres(&mut self, host: &str, port: u16) -> anyhow::Result<()> {
122 wait_for_tcp(host, port, self.retries, self.interval)
123 .map_err(|err| anyhow::anyhow!("PostgreSQL did not become reachable: {err}"))
124 }
125
126 fn wait_qdrant(&mut self, host: &str, port: u16) -> anyhow::Result<()> {
127 let healthz = || -> anyhow::Result<()> {
128 let mut stream = TcpStream::connect((host, port))?;
129 stream.set_read_timeout(Some(Duration::from_secs(3)))?;
130 stream.set_write_timeout(Some(Duration::from_secs(3)))?;
131 stream.write_all(b"GET /healthz HTTP/1.0\r\nHost: localhost\r\n\r\n")?;
132 let mut status_line = String::new();
133 std::io::BufReader::new(stream).read_line(&mut status_line)?;
134 if status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200") {
135 Ok(())
136 } else {
137 anyhow::bail!("unexpected Qdrant health response")
138 }
139 };
140 wait_for(healthz, self.retries, self.interval)
141 .map_err(|err| anyhow::anyhow!("Qdrant did not become healthy: {err}"))
142 }
143
144 fn wait_falkordb(&mut self, host: &str, port: u16) -> anyhow::Result<()> {
145 wait_for_tcp(host, port, self.retries, self.interval)
146 .map_err(|err| anyhow::anyhow!("FalkorDB did not become reachable: {err}"))
147 }
148}
149
150pub fn provision_docker_services(
151 options: &DockerServiceOptions,
152) -> anyhow::Result<DockerProvisioningReport> {
153 let mut runner = RealCommandRunner;
154 let mut health = TcpDockerHealthChecker::default();
155 provision_docker_services_with(options, &mut runner, &mut health)
156}
157
158pub fn provision_docker_services_with(
159 options: &DockerServiceOptions,
160 runner: &mut impl CommandRunner,
161 health: &mut impl DockerHealthChecker,
162) -> anyhow::Result<DockerProvisioningReport> {
163 let assets = prepare_service_assets(options)?;
164 let spec = docker_compose_up_spec(options, &assets.compose_file, &assets.services_dir);
165 let output = runner.run(&spec).map_err(|err| {
166 anyhow::anyhow!("failed to execute docker compose for standalone services: {err}")
167 })?;
168 if output.status != 0 {
169 anyhow::bail!(
170 "docker compose up failed: {}",
171 first_non_empty(&output.stderr, &output.stdout)
172 );
173 }
174
175 health.wait_postgres(DEFAULT_POSTGRES_HOST, options.postgres_port)?;
176 health.wait_qdrant(DEFAULT_QDRANT_HOST, options.qdrant_http_port)?;
177 health.wait_falkordb(&options.falkordb_host, options.falkordb_port)?;
178
179 Ok(DockerProvisioningReport {
180 services_dir: assets.services_dir,
181 compose_file: assets.compose_file,
182 env_file: assets.env_file,
183 started_profiles: vec!["all".to_string()],
184 health_checks: vec![
185 "postgres".to_string(),
186 "qdrant".to_string(),
187 "falkordb".to_string(),
188 ],
189 })
190}
191
192pub fn prepare_service_assets(
193 options: &DockerServiceOptions,
194) -> anyhow::Result<ServiceAssetReport> {
195 let services = services_dir(&options.gobby_home);
196 let compose = services.join(COMPOSE_FILENAME);
197 let pgsearch = services.join("postgres-pgsearch");
198 let env_file = services.join(".env");
199
200 fs::create_dir_all(pgsearch.join("initdb.d"))?;
201 fs::create_dir_all(pgsearch.join("scripts"))?;
202 fs::write(&compose, COMPOSE_TEMPLATE)?;
203 fs::write(pgsearch.join("Dockerfile"), PGSEARCH_DOCKERFILE)?;
204 fs::write(pgsearch.join("version.json"), PGSEARCH_VERSION)?;
205 fs::write(
206 pgsearch.join("initdb.d").join("01-pg_search.sql"),
207 PGSEARCH_INIT_PG_SEARCH,
208 )?;
209 fs::write(
210 pgsearch.join("initdb.d").join("02-pgaudit.sql"),
211 PGSEARCH_INIT_PGAUDIT,
212 )?;
213 let audit_script = pgsearch.join("scripts").join("pg_audit_export.sh");
214 fs::write(&audit_script, PG_AUDIT_EXPORT)?;
215 make_executable(&audit_script)?;
216
217 let manifest = pgsearch_manifest()?;
218 update_env_file(
219 &env_file,
220 BTreeMap::from([
221 (
222 "GOBBY_PG_SEARCH_VERSION".to_string(),
223 manifest.pg_search_version,
224 ),
225 ("GOBBY_PG_SEARCH_SHA256".to_string(), manifest.sha256),
226 (
227 "GOBBY_POSTGRES_PORT".to_string(),
228 options.postgres_port.to_string(),
229 ),
230 (
231 "GOBBY_POSTGRES_DB".to_string(),
232 DEFAULT_POSTGRES_DB.to_string(),
233 ),
234 (
235 "GOBBY_POSTGRES_USER".to_string(),
236 DEFAULT_POSTGRES_USER.to_string(),
237 ),
238 (
239 "GOBBY_POSTGRES_PASSWORD".to_string(),
240 DEFAULT_POSTGRES_PASSWORD.to_string(),
241 ),
242 (
243 "GOBBY_QDRANT_HTTP_PORT".to_string(),
244 options.qdrant_http_port.to_string(),
245 ),
246 (
247 "GOBBY_QDRANT_GRPC_PORT".to_string(),
248 options.qdrant_grpc_port.to_string(),
249 ),
250 (
251 "GOBBY_FALKORDB_PORT".to_string(),
252 options.falkordb_port.to_string(),
253 ),
254 (
255 "GOBBY_FALKORDB_BROWSER_PORT".to_string(),
256 options.falkordb_browser_port.to_string(),
257 ),
258 (
259 "GOBBY_FALKORDB_PASSWORD".to_string(),
260 options.falkordb_password.clone(),
261 ),
262 ]),
263 )?;
264
265 Ok(ServiceAssetReport {
266 services_dir: services,
267 compose_file: compose,
268 env_file,
269 postgres_asset_dir: pgsearch,
270 })
271}
272
273pub fn docker_compose_up_spec(
274 options: &DockerServiceOptions,
275 compose_file: &Path,
276 services_dir: &Path,
277) -> CommandSpec {
278 CommandSpec {
279 program: "docker".to_string(),
280 args: vec![
281 "compose".to_string(),
282 "-f".to_string(),
283 compose_file.display().to_string(),
284 "--profile".to_string(),
285 "all".to_string(),
286 "up".to_string(),
287 "-d".to_string(),
288 "--remove-orphans".to_string(),
289 ],
290 env: BTreeMap::from([
291 (
292 "GOBBY_FALKORDB_PASSWORD".to_string(),
293 options.falkordb_password.clone(),
294 ),
295 (
296 "GOBBY_POSTGRES_PORT".to_string(),
297 options.postgres_port.to_string(),
298 ),
299 (
300 "GOBBY_QDRANT_HTTP_PORT".to_string(),
301 options.qdrant_http_port.to_string(),
302 ),
303 ]),
304 cwd: Some(services_dir.to_path_buf()),
305 }
306}
307
308#[derive(Debug, Deserialize)]
309struct PgSearchVersionFile {
310 pg_search_version: String,
311 pg_search_sha256: String,
312 pg_search_sha256_by_arch: Option<BTreeMap<String, String>>,
313}
314
315struct PgSearchManifest {
316 pg_search_version: String,
317 sha256: String,
318}
319
320fn pgsearch_manifest() -> anyhow::Result<PgSearchManifest> {
321 let parsed: PgSearchVersionFile = serde_json::from_str(PGSEARCH_VERSION)?;
322 let arch = debian_arch(std::env::consts::ARCH);
323 let sha256 = parsed
324 .pg_search_sha256_by_arch
325 .and_then(|by_arch| by_arch.get(&arch).cloned())
326 .unwrap_or(parsed.pg_search_sha256);
327 Ok(PgSearchManifest {
328 pg_search_version: parsed.pg_search_version,
329 sha256,
330 })
331}
332
333fn debian_arch(arch: &str) -> String {
334 match arch {
335 "x86_64" | "amd64" => "amd64".to_string(),
336 "aarch64" | "arm64" => "arm64".to_string(),
337 other => other.to_string(),
338 }
339}
340
341fn update_env_file(path: &Path, updates: BTreeMap<String, String>) -> anyhow::Result<()> {
342 if let Some(parent) = path.parent() {
343 fs::create_dir_all(parent)?;
344 }
345 let mut lines = Vec::new();
346 if path.exists() {
347 for line in fs::read_to_string(path)?.lines() {
348 let key = line.split_once('=').map(|(key, _)| key).unwrap_or(line);
349 if !updates.contains_key(key) {
350 lines.push(line.to_string());
351 }
352 }
353 if lines.last().is_some_and(|line| !line.trim().is_empty()) {
354 lines.push(String::new());
355 }
356 }
357 for (key, value) in updates {
358 lines.push(format!("{key}={value}"));
359 }
360 fs::write(path, format!("{}\n", lines.join("\n")))?;
361 Ok(())
362}
363
364fn first_non_empty<'a>(first: &'a str, second: &'a str) -> &'a str {
365 if first.trim().is_empty() {
366 second.trim()
367 } else {
368 first.trim()
369 }
370}
371
372fn wait_for_tcp(host: &str, port: u16, retries: usize, interval: Duration) -> anyhow::Result<()> {
373 wait_for(
374 || {
375 TcpStream::connect((host, port))
376 .map(|_| ())
377 .map_err(Into::into)
378 },
379 retries,
380 interval,
381 )
382}
383
384fn wait_for(
385 mut check: impl FnMut() -> anyhow::Result<()>,
386 retries: usize,
387 interval: Duration,
388) -> anyhow::Result<()> {
389 if retries == 0 {
390 anyhow::bail!("health check failed without retries");
391 }
392 let mut last_error = None;
393 for attempt in 0..retries {
394 match check() {
395 Ok(()) => return Ok(()),
396 Err(err) => last_error = Some(err),
397 }
398 if attempt + 1 < retries {
399 std::thread::sleep(interval);
400 }
401 }
402 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("health check failed")))
403}
404
405fn make_executable(path: &Path) -> anyhow::Result<()> {
406 #[cfg(unix)]
407 {
408 use std::os::unix::fs::PermissionsExt;
409 let mut permissions = fs::metadata(path)?.permissions();
410 permissions.set_mode(0o755);
411 fs::set_permissions(path, permissions)?;
412 }
413 #[cfg(not(unix))]
414 {
415 let _ = path;
416 }
417 Ok(())
418}