groupcache 0.3.0

groupcache is a distributed caching and cache-filling library, intended as a replacement for a pool of memcached nodes in many cases. It shards by key to select which peer is responsible for that key.
Documentation
#![allow(dead_code)]

use anyhow::anyhow;
use anyhow::Result;
use async_trait::async_trait;
use groupcache::Groupcache;
use moka::future::CacheBuilder;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::future::{pending, Future};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::codegen::tokio_stream;
use tonic::transport::Server;

pub static OS_ALLOCATED_PORT_ADDR: &str = "127.0.0.1:0";
pub static HOT_CACHE_TTL: Duration = Duration::from_millis(100);

pub fn key_owned_by_instance(instance: TestGroupcache) -> String {
    format!("{}_0", instance.addr())
}

pub fn error_key_on_instance(instance: TestGroupcache) -> String {
    format!("{}_13", instance.addr())
}

pub async fn two_instances() -> Result<(TestGroupcache, TestGroupcache)> {
    let instance_one = spawn_groupcache("1").await?;
    let instance_two = spawn_groupcache("2").await?;

    Ok((instance_one, instance_two))
}

pub async fn spawn_instances(n: usize) -> Result<Vec<TestGroupcache>> {
    let mut instances = Vec::new();
    for i in 0..n {
        let instance = spawn_groupcache(&i.to_string()).await?;
        instances.push(instance);
    }

    let first_instance = &instances[0];
    for instance in instances.iter().skip(1) {
        first_instance.add_peer(instance.addr().into()).await?;
    }

    Ok(instances)
}

pub async fn single_instance() -> Result<TestGroupcache> {
    spawn_groupcache("1").await
}

pub async fn two_connected_instances() -> Result<(TestGroupcache, TestGroupcache)> {
    let (instance_one, instance_two) = two_instances().await?;

    instance_one.add_peer(instance_two.addr().into()).await?;
    instance_two.add_peer(instance_one.addr().into()).await?;

    Ok((instance_one, instance_two))
}

pub async fn reconnect(instance: TestGroupcache) {
    let listener = TcpListener::bind(instance.addr()).await.unwrap();
    tokio::spawn(async move {
        Server::builder()
            .add_service(instance.grpc_service())
            .serve_with_incoming(TcpListenerStream::new(listener))
            .await
            .unwrap();
    });
}

pub async fn two_instances_with_one_disconnected() -> Result<(TestGroupcache, TestGroupcache)> {
    let (shutdown_signal, shutdown_recv) = tokio::sync::oneshot::channel::<()>();
    let (shutdown_done_s, shutdown_done_r) = tokio::sync::oneshot::channel::<()>();
    pub async fn shutdown_proxy(shutdown_signal: Receiver<()>, shutdown_done: Sender<()>) {
        shutdown_signal.await.unwrap();
        shutdown_done.send(()).unwrap();
    }

    let instance_one = spawn_groupcache("1").await?;
    let instance_two = spawn_groupcache_instance(
        "2",
        OS_ALLOCATED_PORT_ADDR,
        shutdown_proxy(shutdown_recv, shutdown_done_s),
    )
    .await?;

    instance_one.add_peer(instance_two.addr().into()).await?;
    instance_two.add_peer(instance_one.addr().into()).await?;

    shutdown_signal.send(()).unwrap();
    shutdown_done_r.await.unwrap();

    Ok((instance_one, instance_two))
}

pub async fn spawn_groupcache(instance_id: &str) -> Result<TestGroupcache> {
    spawn_groupcache_instance(instance_id, OS_ALLOCATED_PORT_ADDR, pending()).await
}

pub async fn spawn_groupcache_instance(
    instance_id: &str,
    addr: &str,
    shutdown_signal: impl Future<Output = ()> + Send + 'static,
) -> Result<TestGroupcache> {
    let listener = TcpListener::bind(addr).await.unwrap();
    let addr = listener.local_addr()?;
    let groupcache = Groupcache::builder(addr.into(), TestCacheLoader::new(instance_id))
        .hot_cache(CacheBuilder::default().time_to_live(HOT_CACHE_TTL).build())
        .build();

    let server = groupcache.grpc_service();
    tokio::spawn(async move {
        Server::builder()
            .add_service(server)
            .serve_with_incoming_shutdown(TcpListenerStream::new(listener), shutdown_signal)
            .await
            .unwrap();
    });

    Ok(groupcache)
}

pub async fn success_or_transport_err(key: &str, groupcache: TestGroupcache) {
    let result = groupcache.get(key).await;
    match result {
        Ok(v) => {
            assert_eq!(v.contains(key), true);
        }
        Err(e) => {
            let error_string = e.to_string();
            assert_eq!(
                error_string.contains("Transport"),
                true,
                "expected transport error, got: '{}'",
                error_string
            );
        }
    }
}

#[derive(Default)]
pub struct GetAssertions {
    pub expected_instance_id: Option<String>,
    pub unexpected_instance_id: Option<String>,
    pub expected_load_count: Option<i32>,
}

pub async fn successful_get_not_from_instance(
    key: &str,
    unexpected_instance_id: &str,
    groupcache: TestGroupcache,
) {
    let opts = GetAssertions {
        unexpected_instance_id: Some(unexpected_instance_id.to_string()),
        ..GetAssertions::default()
    };

    successful_get_opts(key, groupcache, opts).await;
}

pub async fn successful_get(
    key: &str,
    expected_instance_id: Option<&str>,
    groupcache: TestGroupcache,
) {
    let opts = GetAssertions {
        expected_instance_id: expected_instance_id.map(|s| s.to_string()),
        ..GetAssertions::default()
    };

    successful_get_opts(key, groupcache, opts).await;
}

pub async fn successful_get_opts(key: &str, groupcache: TestGroupcache, opts: GetAssertions) {
    let v = groupcache.get(key).await.expect("get should be successful");

    assert_eq!(
        v.contains(key),
        true,
        "expected value to be '{}', got: '{}'",
        key,
        v
    );
    if let Some(instance) = opts.expected_instance_id {
        assert_eq!(
            v.contains(&format!("INSTANCE_{}", instance)),
            true,
            "expected instance id to be '{}', got: '{}'",
            instance,
            v
        );
    }

    if let Some(load) = opts.expected_load_count {
        assert_eq!(
            v.contains(&format!("LOAD_{}", load)),
            true,
            "expected load count to be '{}', got: '{}'",
            load,
            v,
        );
    }
}

pub type TestGroupcache = Groupcache<CachedValue>;

pub type CachedValue = String;

pub struct TestCacheLoader {
    instance_id: String,
    load_counter: Arc<RwLock<HashMap<String, i32>>>,
}

impl TestCacheLoader {
    pub fn new(instance_id: &str) -> Self {
        Self {
            instance_id: instance_id.to_string(),
            load_counter: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    pub fn count_loads(&self, key: &str) -> Result<i32> {
        let mut lock = self.load_counter.write().unwrap();
        let counter = lock.entry(key.to_string()).or_insert(0);
        *counter += 1;

        Ok(*counter)
    }
}

#[async_trait]
impl groupcache::ValueLoader for TestCacheLoader {
    type Value = CachedValue;

    async fn load(
        &self,
        key: &str,
    ) -> std::result::Result<Self::Value, Box<dyn std::error::Error + Send + Sync + 'static>> {
        let load_counter = self.count_loads(key)?;
        return if !key.contains("error") && !key.contains("_13") {
            Ok(format!(
                "VAL_INSTANCE_{}_KEY_{}_LOAD_{}",
                self.instance_id, key, load_counter
            ))
        } else {
            Err(anyhow!("Something bad happened during loading :/").into())
        };
    }
}