rustolio-db 0.1.0

An DB extention for the rustolio HTTP-Server
Documentation
//
// SPDX-License-Identifier: MPL-2.0
//
// Copyright (c) 2026 Tobias Binnewies. All rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//

mod disk;
mod memory;

use std::{future::Future, path::PathBuf, pin::Pin};

use rustolio_utils::{crypto::signature::PublicKey, prelude::*};
use tokio::sync::{mpsc, oneshot};

use super::{Key, Result, Value};

use memory::StoreController;

type HandleFn<R> =
    Box<dyn for<'a> FnOnce(&'a mut StoreController) -> PinnedFuture<'a, R> + Send + Sync + 'static>;
type PinnedFuture<'a, R> = Pin<Box<dyn Future<Output = Result<R>> + Send + 'a>>;
type BoxAny = Box<dyn std::any::Any + Send + Sync + 'static>;

pub fn get(key: Key, signer: Option<PublicKey>) -> HandleFn<Option<Value>> {
    Box::new(move |controller: &mut StoreController| {
        Box::pin(async move { controller.get(key, signer).await })
    })
}

pub fn set(key: Key, value: Value) -> HandleFn<()> {
    Box::new(move |controller: &mut StoreController| {
        Box::pin(async move {
            controller.set(key, value).await?;
            Ok(())
        })
    })
}

pub fn getset(key: Key, value: Value) -> HandleFn<Option<Value>> {
    Box::new(move |controller: &mut StoreController| {
        Box::pin(async move {
            let old = controller.get(key, value.signer()).await?;
            controller.set(key, value).await?;
            Ok(old)
        })
    })
}

#[derive(Debug, service::Service)]
pub struct Service {
    mem_channel: mpsc::Sender<memory::Action>,
    disk_channel: mpsc::Sender<disk::Action>,
}

impl Service {
    pub fn new(max_mem_usage: usize, disk_path: PathBuf) -> Self {
        let (disk_channel, rx) = mpsc::channel(20);
        disk::Store::init(disk_path, rx);

        let (mem_channel, rx) = mpsc::channel(20);
        memory::Store::init(max_mem_usage, rx, disk_channel.clone());

        Self {
            mem_channel,
            disk_channel,
        }
    }
}

#[service_impl]
impl StoreReplication for AsRef<Service> {
    async fn store_replication(&'static self, key: Key, value: Value) -> Result<()> {
        if self
            .as_ref()
            .disk_channel
            .send(disk::Action::Set(key, value))
            .await
            .is_err()
        {
            return Err(crate::Error::StoreClosed);
        }
        Ok(())
    }
}

fn create_handle_fn<R: Threadsafe>(f: HandleFn<R>) -> HandleFn<BoxAny> {
    Box::new(move |controller: &mut StoreController| {
        let future = f(controller);
        let future = async move {
            let r: Result<R> = future.await;
            r.map(|v| Box::new(v) as BoxAny)
        };
        Box::pin(future)
    })
}

#[service_impl]
impl OperateStore for AsRef<Service> {
    async fn handle<R: Threadsafe>(&'static self, f: HandleFn<R>) -> Result<R> {
        let f = create_handle_fn(f);
        let (tx, rx) = oneshot::channel();
        if self
            .as_ref()
            .mem_channel
            .send(memory::Action::Update(f, tx))
            .await
            .is_err()
        {
            return Err(crate::Error::StoreClosed);
        }
        let any = rx.await.unwrap()?;
        Ok(unsafe {
            // SAFETY: Will always be the expcted type
            *any.downcast().unwrap_unchecked()
        })
    }

    async fn shutdown(&'static self) -> Result<()> {
        let (tx, rx) = oneshot::channel();
        self.as_ref()
            .mem_channel
            .send(memory::Action::Shutdown(tx))
            .await
            .expect("Store already shut down");
        rx.await.unwrap();
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use crate::{test_utils::file::generate_dir, KeyType};

    use super::*;

    #[tokio::test]
    async fn test_store_operation() {
        let (dir, _guard) = generate_dir("store_operation");

        let kv_store = Service::new(5, dir);
        service!(kv_store as Service);

        let key = Key::from_value(&0, KeyType::ReadWrite);
        let value = Value::from_value(&"foo").unwrap();

        let get_op = get(key, None);
        let res = kv_store.handle(get_op).await.unwrap();
        assert_eq!(res, None);

        let set_op = set(key, value.clone());
        kv_store.handle(set_op).await.unwrap();
        // Wait for the value to be actually set -> returns instandy
        tokio::time::sleep(Duration::from_millis(50)).await;

        let get_op = get(key, None);
        let res = kv_store.handle(get_op).await.unwrap();
        assert_eq!(res, Some(value.clone()));

        let new_value = Value::from_value(&"bar").unwrap();
        let getset_op = getset(key, new_value.clone());
        let res = kv_store.handle(getset_op).await.unwrap();
        assert_eq!(res, Some(value));
        // Wait for the value to be actually set -> returns instandy
        tokio::time::sleep(Duration::from_millis(50)).await;

        let get_op = get(key, None);
        let res = kv_store.handle(get_op).await.unwrap();
        assert_eq!(res, Some(new_value));

        // Wait before dropping the dir guard - removal of the directory failed sometimes (i think because the disk_Store created a file while the dir was removed)
        tokio::time::sleep(Duration::from_millis(50)).await;
    }

    #[tokio::test]
    async fn test_store_replication() {
        let (dir, _guard) = generate_dir("store_replication");

        let kv_store = Service::new(5, dir);
        service!(kv_store as Service);

        let key = Key::from_value(&0, KeyType::ReadWrite);
        let value = Value::from_value(&"foo").unwrap();

        let get_op = get(key, None);
        let res = kv_store.handle(get_op).await.unwrap();
        assert_eq!(res, None);

        kv_store
            .store_replication(key, value.clone())
            .await
            .unwrap();
        // Wait for the value to be actually set -> returns instandy
        tokio::time::sleep(Duration::from_millis(50)).await;

        let get_op = get(key, None);
        let res = kv_store.handle(get_op).await.unwrap();
        assert_eq!(res.unwrap(), value);
    }
}