g1-sqlite-connection 0.1.0-alpha.3

A G1 connection based on an SQLite database, using the FS for blobs.
use crate::{Atom, Command, SqliteError};
use g1_common::naive_solve::naive_solve;
use log::error;
use rusqlite::{Connection, NO_PARAMS};
use std::sync::Arc;
use tokio::sync::oneshot::Sender;

impl Command {
    pub(crate) fn run(self, conn: &mut Connection) {
        match self {
            Command::CreateAtom(send) => {
                with_sender(send, || {
                    let mut retries = 3;
                    loop {
                        let atom = Atom::new();
                        match conn.execute("insert into atoms values (?)", &[atom.to_string()]) {
                            Ok(_) => break Ok(atom),
                            Err(rusqlite::Error::SqliteFailure(
                                rusqlite::ffi::Error {
                                    code: rusqlite::ErrorCode::ConstraintViolation,
                                    extended_code: 2067,
                                },
                                _,
                            )) if retries > 0 => {
                                retries -= 1;
                                error!("Failed to create atom; check your entropy")
                            }
                            Err(e) => break Err(e.into()),
                        }
                    }
                });
            }
            Command::DeleteAtom(atom, send) => with_sender(send, move || {
                let tx = conn.transaction()?;

                let _ = tx.execute("delete from names where atom = ?", &[atom.to_string()])?;
                let _ = tx.execute("delete from edges where edge_from = ?", &[atom.to_string()])?;
                let _ = tx.execute("delete from edges where edge_to = ?", &[atom.to_string()])?;
                let _ = tx.execute("delete from tags where atom = ?", &[atom.to_string()])?;
                let _ = tx.execute("delete from blobs where atom = ?", &[atom.to_string()])?;

                tx.finish()?;
                Ok(())
            }),
            Command::CreateName(atom, ns, title, true, send) => {
                with_sender(send, move || {
                    let _ = conn.execute(
                        "insert or replace into names values (?, ?, ?)",
                        &[atom.to_string(), ns, title],
                    )?;
                    Ok(())
                });
            }
            Command::CreateName(atom, ns, title, false, send) => {
                with_sender(send, move || {
                    let _ = conn.execute(
                        "insert into names values (?, ?, ?)",
                        &[atom.to_string(), ns, title],
                    )?;
                    Ok(())
                });
            }
            Command::DeleteName(ns, title, send) => with_sender(send, move || {
                conn.execute("delete from names where ns = ? and title = ?", &[ns, title])
                    .map(|n| match n {
                        0 => false,
                        1 => true,
                        n => {
                            error!("unexpected result from deleting name: {}", n);
                            true
                        }
                    })
                    .map_err(From::from)
            }),
            Command::CreateEdge(from, to, label, send) => {
                with_sender(send, move || {
                    match conn.execute(
                        "insert into edges values (?, ?, ?)",
                        &[from.to_string(), to.to_string(), label],
                    ) {
                        Ok(_) => Ok(false),
                        Err(rusqlite::Error::SqliteFailure(
                            rusqlite::ffi::Error {
                                code: rusqlite::ErrorCode::ConstraintViolation,
                                extended_code: 2067,
                            },
                            _,
                        )) => Ok(true),
                        Err(e) => Err(e.into()),
                    }
                });
            }
            Command::DeleteEdge(from, to, label, send) => with_sender(send, move || {
                conn.execute(
                    "delete from edges where edge_from = ? and edge_to = ? and label = ?",
                    &[from.to_string(), to.to_string(), label],
                )
                .map(|n| match n {
                    0 => false,
                    1 => true,
                    n => {
                        error!("unexpected result from deleting edge: {}", n);
                        true
                    }
                })
                .map_err(From::from)
            }),
            Command::CreateTag(atom, key, value, true, send) => {
                with_sender(send, move || {
                    let _ = conn.execute(
                        "insert or replace into tags values (?, ?, ?)",
                        &[atom.to_string(), key, value],
                    )?;
                    Ok(())
                });
            }
            Command::CreateTag(atom, key, value, false, send) => {
                with_sender(send, move || {
                    let _ = conn.execute(
                        "insert into tags values (?, ?, ?)",
                        &[atom.to_string(), key, value],
                    )?;
                    Ok(())
                });
            }
            Command::DeleteTag(atom, key, send) => with_sender(send, move || {
                conn.execute(
                    "delete from tags where atom = ? and key = ?",
                    &[atom.to_string(), key],
                )
                .map(|n| match n {
                    0 => false,
                    1 => true,
                    n => {
                        error!("unexpected result from deleting tag: {}", n);
                        true
                    }
                })
                .map_err(From::from)
            }),
            Command::CreateBlob(atom, kind, mime, hash, true, send) => {
                with_sender(send, move || {
                    let _ = conn.execute(
                        "insert or replace into blobs values (?, ?, ?, ?)",
                        &[atom.to_string(), kind, mime.to_string(), hash.to_string()],
                    )?;
                    Ok(())
                });
            }
            Command::CreateBlob(atom, kind, mime, hash, false, send) => {
                with_sender(send, move || {
                    let _ = conn.execute(
                        "insert into blobs values (?, ?, ?, ?)",
                        &[atom.to_string(), kind, mime.to_string(), hash.to_string()],
                    )?;
                    Ok(())
                });
            }
            Command::DeleteBlob(atom, kind, mime, send) => with_sender(send, move || {
                conn.execute(
                    "delete from blobs where atom = ? and kind = ? and mime = ?",
                    &[atom.to_string(), kind, mime.to_string()],
                )
                .map(|n| match n {
                    0 => false,
                    1 => true,
                    n => {
                        error!("unexpected result from deleting blob: {}", n);
                        true
                    }
                })
                .map_err(From::from)
            }),
            Command::Query(limit, query, send) => {
                with_sender(send, move || {
                    let tx = conn.transaction()?;

                    let atoms = tx
                        .prepare("select atom from atoms")?
                        .query_and_then(NO_PARAMS, |row| Ok(Arc::from(row.get::<_, String>(0)?)))?
                        .collect::<Result<Vec<_>, SqliteError>>()?;

                    let names = tx
                        .prepare("select atom, ns, title from names")?
                        .query_and_then(NO_PARAMS, |row| {
                            Ok((
                                Arc::from(row.get::<_, String>(0)?),
                                Arc::from(row.get::<_, String>(1)?),
                                Arc::from(row.get::<_, String>(2)?),
                            ))
                        })?
                        .collect::<Result<Vec<_>, SqliteError>>()?;

                    let edges = tx
                        .prepare("select edge_from, edge_to, label from edges")?
                        .query_and_then(NO_PARAMS, |row| {
                            Ok((
                                Arc::from(row.get::<_, String>(0)?),
                                Arc::from(row.get::<_, String>(1)?),
                                Arc::from(row.get::<_, String>(2)?),
                            ))
                        })?
                        .collect::<Result<Vec<_>, SqliteError>>()?;

                    let tags = tx
                        .prepare("select atom, key, value from tags")?
                        .query_and_then(NO_PARAMS, |row| {
                            Ok((
                                Arc::from(row.get::<_, String>(0)?),
                                Arc::from(row.get::<_, String>(1)?),
                                Arc::from(row.get::<_, String>(2)?),
                            ))
                        })?
                        .collect::<Result<Vec<_>, SqliteError>>()?;

                    let blobs = tx
                        .prepare("select atom, kind, mime, hash from blobs")?
                        .query_and_then(NO_PARAMS, |row| {
                            Ok((
                                Arc::from(row.get::<_, String>(0)?),
                                Arc::from(row.get::<_, String>(1)?),
                                Arc::from(row.get::<_, String>(2)?),
                                Arc::from(row.get::<_, String>(3)?),
                            ))
                        })?
                        .collect::<Result<Vec<_>, SqliteError>>()?;

                    tx.finish()?;

                    Ok(naive_solve(
                        &atoms, &names, &edges, &tags, &blobs, limit, &query,
                    ))
                });
            }
        }
    }
}

fn with_sender<F, T>(send: Sender<Result<T, SqliteError>>, func: F)
where
    F: FnOnce() -> Result<T, SqliteError>,
{
    let _ = send.send(func());
}