apt_swarm/p2p/
db.rs

1use crate::db::proto::{ErrorResponse, Query, Response};
2use crate::db::{DatabaseClient, DatabaseServerClient};
3use crate::errors::*;
4use crate::sync;
5use std::convert::Infallible;
6use std::path::PathBuf;
7use tokio::fs;
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::net::{UnixListener, UnixStream};
10
11pub async fn serve_request(db: &mut DatabaseServerClient, buf: &[u8]) -> Result<Response> {
12    let query = serde_json::from_slice(buf).context("Failed to deserialize query")?;
13    match query {
14        Query::AddRelease(fp, signed) => {
15            let fp = fp.parse().context("Failed to parse fingerprint")?;
16            let hash = db.add_release(&fp, &signed).await?;
17            Ok(Response::Inserted(hash))
18        }
19        Query::IndexFromScan(query) => {
20            let fp = query.fp.parse().context("Failed to parse fingerprint")?;
21            let index = db
22                .index_from_scan(&sync::TreeQuery {
23                    fp,
24                    hash_algo: query.hash_algo,
25                    prefix: query.prefix,
26                })
27                .await?;
28            Ok(Response::Index(index))
29        }
30        Query::Count(key) => {
31            let count = db.count(&key).await?;
32            Ok(Response::Num(count))
33        }
34    }
35}
36
37pub async fn serve_db_client(db: &mut DatabaseServerClient, mut stream: UnixStream) -> Result<()> {
38    let (rx, mut tx) = stream.split();
39    let mut reader = BufReader::new(rx);
40
41    let mut buf = Vec::new();
42    loop {
43        buf.clear();
44        reader
45            .read_until(b'\n', &mut buf)
46            .await
47            .context("Failed to read from database client")?;
48        if !buf.ends_with(b"\n") {
49            // client has disconnected
50            return Ok(());
51        }
52
53        match serve_request(db, &buf).await {
54            Ok(Response::Ok) => tx.write_all(b"\n").await?,
55            Ok(response) => {
56                let mut err = serde_json::to_string(&response)?;
57                err.push('\n');
58                tx.write_all(err.as_bytes()).await?;
59            }
60            Err(err) => {
61                let err = ErrorResponse::new(&err);
62                let mut err = serde_json::to_string(&err)?;
63                err.push('\n');
64                tx.write_all(err.as_bytes()).await?;
65            }
66        }
67    }
68}
69
70pub async fn spawn_unix_db_server(db: &DatabaseServerClient, path: PathBuf) -> Result<Infallible> {
71    fs::remove_file(&path).await.ok();
72    let listener = UnixListener::bind(&path)
73        .with_context(|| anyhow!("Failed to bind database socket at: {path:?}"))?;
74    debug!("Bound database socket at {path:?}");
75
76    loop {
77        let (stream, _src_addr) = listener.accept().await?;
78        debug!("Accepted database client on unix domain socket");
79
80        let mut db = db.clone();
81        tokio::spawn(async move {
82            if let Err(err) = serve_db_client(&mut db, stream).await {
83                error!("Error while serving database client: {err:#}");
84            } else {
85                debug!("Database client disconnected");
86            }
87        });
88    }
89}