1use crate::db::proto::{ErrorResponse, Query, Response};
2use crate::db::{DatabaseClient, DatabaseServerClient};
3use crate::errors::*;
4use crate::sync;
5use std::convert::Infallible;
6use std::path::PathBuf;
7use tokio::fs;
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::net::{UnixListener, UnixStream};
10
11pub async fn serve_request(db: &mut DatabaseServerClient, buf: &[u8]) -> Result<Response> {
12 let query = serde_json::from_slice(buf).context("Failed to deserialize query")?;
13 match query {
14 Query::AddRelease(fp, signed) => {
15 let fp = fp.parse().context("Failed to parse fingerprint")?;
16 let hash = db.add_release(&fp, &signed).await?;
17 Ok(Response::Inserted(hash))
18 }
19 Query::IndexFromScan(query) => {
20 let fp = query.fp.parse().context("Failed to parse fingerprint")?;
21 let index = db
22 .index_from_scan(&sync::TreeQuery {
23 fp,
24 hash_algo: query.hash_algo,
25 prefix: query.prefix,
26 })
27 .await?;
28 Ok(Response::Index(index))
29 }
30 Query::Count(key) => {
31 let count = db.count(&key).await?;
32 Ok(Response::Num(count))
33 }
34 }
35}
36
37pub async fn serve_db_client(db: &mut DatabaseServerClient, mut stream: UnixStream) -> Result<()> {
38 let (rx, mut tx) = stream.split();
39 let mut reader = BufReader::new(rx);
40
41 let mut buf = Vec::new();
42 loop {
43 buf.clear();
44 reader
45 .read_until(b'\n', &mut buf)
46 .await
47 .context("Failed to read from database client")?;
48 if !buf.ends_with(b"\n") {
49 return Ok(());
51 }
52
53 match serve_request(db, &buf).await {
54 Ok(Response::Ok) => tx.write_all(b"\n").await?,
55 Ok(response) => {
56 let mut err = serde_json::to_string(&response)?;
57 err.push('\n');
58 tx.write_all(err.as_bytes()).await?;
59 }
60 Err(err) => {
61 let err = ErrorResponse::new(&err);
62 let mut err = serde_json::to_string(&err)?;
63 err.push('\n');
64 tx.write_all(err.as_bytes()).await?;
65 }
66 }
67 }
68}
69
70pub async fn spawn_unix_db_server(db: &DatabaseServerClient, path: PathBuf) -> Result<Infallible> {
71 fs::remove_file(&path).await.ok();
72 let listener = UnixListener::bind(&path)
73 .with_context(|| anyhow!("Failed to bind database socket at: {path:?}"))?;
74 debug!("Bound database socket at {path:?}");
75
76 loop {
77 let (stream, _src_addr) = listener.accept().await?;
78 debug!("Accepted database client on unix domain socket");
79
80 let mut db = db.clone();
81 tokio::spawn(async move {
82 if let Err(err) = serve_db_client(&mut db, stream).await {
83 error!("Error while serving database client: {err:#}");
84 } else {
85 debug!("Database client disconnected");
86 }
87 });
88 }
89}