use kitsune2_api::*;
use std::sync::{Arc, Mutex};
mod config {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "camelCase")]
pub struct MemBootstrapConfig {
pub test_id: String,
#[cfg_attr(feature = "schema", schemars(default))]
pub poll_freq_ms: u32,
}
impl Default for MemBootstrapConfig {
fn default() -> Self {
Self {
test_id: format!("{:?}", std::thread::current().id()),
poll_freq_ms: 5000,
}
}
}
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "camelCase")]
pub struct MemBootstrapModConfig {
pub mem_bootstrap: MemBootstrapConfig,
}
}
pub use config::*;
#[derive(Debug)]
pub struct MemBootstrapFactory {}
impl MemBootstrapFactory {
pub fn create() -> DynBootstrapFactory {
let out: DynBootstrapFactory = Arc::new(MemBootstrapFactory {});
out
}
pub fn trigger_immediate_poll() {
NOTIFY.notify_waiters();
}
}
impl BootstrapFactory for MemBootstrapFactory {
fn default_config(&self, config: &mut Config) -> K2Result<()> {
config.set_module_config(&MemBootstrapModConfig::default())
}
fn validate_config(&self, _config: &Config) -> K2Result<()> {
Ok(())
}
fn create(
&self,
builder: Arc<Builder>,
peer_store: DynPeerStore,
_space: SpaceId,
) -> BoxFut<'static, K2Result<DynBootstrap>> {
Box::pin(async move {
let config: MemBootstrapModConfig =
builder.config.get_module_config()?;
let out: DynBootstrap =
Arc::new(MemBootstrap::new(config.mem_bootstrap, peer_store));
Ok(out)
})
}
}
#[derive(Debug)]
struct MemBootstrap {
test_id: Arc<str>,
task: tokio::task::JoinHandle<()>,
}
impl Drop for MemBootstrap {
fn drop(&mut self) {
self.task.abort();
}
}
impl MemBootstrap {
pub fn new(config: MemBootstrapConfig, peer_store: DynPeerStore) -> Self {
let test_id: Arc<str> = config.test_id.into_boxed_str().into();
let test_id2 = test_id.clone();
let poll_freq =
std::time::Duration::from_millis(config.poll_freq_ms as u64);
let task = tokio::task::spawn(async move {
loop {
let info_list = stat_process(test_id2.clone(), None);
peer_store.insert(info_list).await.unwrap();
tokio::select! {
_ = tokio::time::sleep(poll_freq) => (),
_ = NOTIFY.notified() => (),
}
}
});
Self { test_id, task }
}
}
impl Bootstrap for MemBootstrap {
fn put(&self, info: Arc<AgentInfoSigned>) {
let _ = stat_process(self.test_id.clone(), Some(info));
}
}
static NOTIFY: tokio::sync::Notify = tokio::sync::Notify::const_new();
type Store = Vec<Arc<AgentInfoSigned>>;
type Map = std::collections::HashMap<Arc<str>, Store>;
static STAT: std::sync::OnceLock<Mutex<Map>> = std::sync::OnceLock::new();
fn stat_process(
id: Arc<str>,
info: Option<Arc<AgentInfoSigned>>,
) -> Vec<Arc<AgentInfoSigned>> {
let mut lock = STAT.get_or_init(Default::default).lock().unwrap();
let store = lock.entry(id).or_default();
let now = Timestamp::now();
store.retain(|a| {
if let Some(info) = info.as_ref() {
if a.agent == info.agent {
return false;
}
}
if a.expires_at <= now {
return false;
}
true
});
if let Some(info) = info {
while store.len() > 31 {
store.remove(16);
}
store.push(info);
}
store.clone()
}
#[cfg(test)]
mod test;