use kitsune2_api::*;
use std::sync::Arc;
pub mod config {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CoreBootstrapConfig {
pub server_url: String,
pub backoff_min_ms: u32,
pub backoff_max_ms: u32,
}
impl Default for CoreBootstrapConfig {
fn default() -> Self {
Self {
server_url: "<https://your.bootstrap.url>".into(),
backoff_min_ms: 1000 * 5,
backoff_max_ms: 1000 * 60 * 5,
}
}
}
impl CoreBootstrapConfig {
pub fn backoff_min(&self) -> std::time::Duration {
std::time::Duration::from_millis(self.backoff_min_ms as u64)
}
pub fn backoff_max(&self) -> std::time::Duration {
std::time::Duration::from_millis(self.backoff_max_ms as u64)
}
}
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CoreBootstrapModConfig {
pub core_bootstrap: CoreBootstrapConfig,
}
}
pub use config::*;
#[derive(Debug)]
pub struct CoreBootstrapFactory {}
impl CoreBootstrapFactory {
pub fn create() -> DynBootstrapFactory {
let out: DynBootstrapFactory = Arc::new(CoreBootstrapFactory {});
out
}
}
impl BootstrapFactory for CoreBootstrapFactory {
fn default_config(&self, config: &mut Config) -> K2Result<()> {
config.set_module_config(&CoreBootstrapModConfig::default())
}
fn validate_config(&self, config: &Config) -> K2Result<()> {
const ERR: &str = "invalid bootstrap server_url";
let config: CoreBootstrapModConfig = config.get_module_config()?;
let url = url::Url::parse(&config.core_bootstrap.server_url)
.map_err(|e| K2Error::other_src(ERR, e))?;
if url.cannot_be_a_base() {
return Err(K2Error::other(ERR));
}
match url.scheme() {
"http" | "https" => Ok(()),
_ => Err(K2Error::other(ERR)),
}
}
fn create(
&self,
builder: Arc<Builder>,
peer_store: DynPeerStore,
space: SpaceId,
) -> BoxFut<'static, K2Result<DynBootstrap>> {
Box::pin(async move {
let config: CoreBootstrapModConfig =
builder.config.get_module_config()?;
let out: DynBootstrap = Arc::new(CoreBootstrap::new(
builder,
config.core_bootstrap,
peer_store,
space,
));
Ok(out)
})
}
}
type PushSend = tokio::sync::mpsc::Sender<Arc<AgentInfoSigned>>;
type PushRecv = tokio::sync::mpsc::Receiver<Arc<AgentInfoSigned>>;
#[derive(Debug)]
struct CoreBootstrap {
space: SpaceId,
push_send: PushSend,
push_task: tokio::task::JoinHandle<()>,
poll_task: tokio::task::JoinHandle<()>,
}
impl Drop for CoreBootstrap {
fn drop(&mut self) {
self.push_task.abort();
self.poll_task.abort();
}
}
impl CoreBootstrap {
pub fn new(
builder: Arc<Builder>,
config: CoreBootstrapConfig,
peer_store: DynPeerStore,
space: SpaceId,
) -> Self {
let server_url: Arc<str> =
config.server_url.clone().into_boxed_str().into();
let (push_send, push_recv) = tokio::sync::mpsc::channel(1024);
let push_task = tokio::task::spawn(push_task(
config.clone(),
server_url.clone(),
push_send.clone(),
push_recv,
));
let poll_task = tokio::task::spawn(poll_task(
builder,
config,
server_url,
space.clone(),
peer_store,
));
Self {
space,
push_send,
push_task,
poll_task,
}
}
}
impl Bootstrap for CoreBootstrap {
fn put(&self, info: Arc<AgentInfoSigned>) {
if info.space != self.space {
tracing::error!(
?info,
"Logic Error: Attempting to put an agent outside of this space"
);
return;
}
if let Err(err) = self.push_send.try_send(info) {
tracing::warn!(?err, "Bootstrap overloaded, dropping put");
}
}
}
async fn push_task(
config: CoreBootstrapConfig,
server_url: Arc<str>,
push_send: PushSend,
mut push_recv: PushRecv,
) {
let mut wait = None;
while let Some(info) = push_recv.recv().await {
let url =
format!("{server_url}/bootstrap/{}/{}", &info.space, &info.agent);
let enc = match info.encode() {
Err(err) => {
tracing::error!(?err, "Could not encode agent info, dropping");
continue;
}
Ok(enc) => enc,
};
match tokio::task::spawn_blocking(move || {
ureq::put(&url).send_string(&enc)
})
.await
{
Ok(Ok(_)) => {
wait = None;
}
_ => {
let now = Timestamp::now();
if info.expires_at > now {
let _ = push_send.try_send(info);
}
match wait {
None => wait = Some(config.backoff_min()),
Some(p) => {
let mut p = p * 2;
if p > config.backoff_max() {
p = config.backoff_max();
}
wait = Some(p);
}
}
if let Some(wait) = &wait {
tokio::time::sleep(*wait).await;
}
}
}
}
}
async fn poll_task(
builder: Arc<Builder>,
config: CoreBootstrapConfig,
server_url: Arc<str>,
space: SpaceId,
peer_store: DynPeerStore,
) {
let mut wait = config.backoff_min();
loop {
let url = format!("{server_url}/bootstrap/{space}");
match tokio::task::spawn_blocking(move || {
ureq::get(&url)
.call()
.map_err(K2Error::other)?
.into_string()
.map_err(K2Error::other)
})
.await
.map_err(|_| K2Error::other("task join error"))
{
Err(err) | Ok(Err(err)) => {
tracing::debug!(?err, "failure contacting bootstrap server");
}
Ok(Ok(data)) => {
match AgentInfoSigned::decode_list(
&builder.verifier,
data.as_bytes(),
) {
Err(err) => tracing::debug!(
?err,
"failure decoding bootstrap server response"
),
Ok(list) => {
wait = config.backoff_max();
let list = list
.into_iter()
.filter_map(|l| match l {
Ok(l) => Some(l),
Err(err) => {
tracing::debug!(
?err,
"failure decoding bootstrap agent info"
);
None
}
})
.collect::<Vec<_>>();
let _ = peer_store.insert(list).await;
}
}
}
}
wait *= 2;
if wait > config.backoff_max() {
wait = config.backoff_max();
}
tokio::time::sleep(wait).await;
}
}
#[cfg(test)]
mod test;