use linkspace_common::prelude::{U16, pkt_fmt, U32 };
use tracing::debug_span;
use crate::{*, runtime::{lk_get_all, lk_watch2}};
pub const STATUS_PATH: IPathC<16> = ipath1(concat_bytes!([255], b"status"));
#[derive(Copy, Clone)]
#[repr(C)]
pub struct LkStatus<'o> {
pub domain: Domain,
pub group: GroupID,
pub objtype: &'o [u8],
pub instance: Option<&'o [u8]>,
}
impl<'o> std::fmt::Debug for LkStatus<'o>{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LkStatus")
.field("domain", &self.domain)
.field("group", &self.group)
.field("objtype", &AB(&self.objtype))
.field("instance",&self.instance.map(AB)).finish()
}
}
#[doc(hidden)]
pub fn lk_status_path(status: LkStatus) -> LkResult<IPathBuf> {
let mut path = STATUS_PATH.into_ipathbuf();
path.try_append_component(&*status.group)?;
path.try_append_component(status.objtype)?;
if let Some(v) = status.instance {
path.try_append_component(v)?;
}
Ok(path)
}
pub fn lk_status_request(status:LkStatus) -> LkResult<NetPktBox>{
lk_linkpoint(status.domain, PRIVATE, &lk_status_path(status)?, &[],&[], None)
}
pub fn lk_status_overwatch(status:LkStatus,max_age:Stamp) -> LkResult<Query> {
let LkStatus { domain, ..} = status;
let path = lk_status_path(status)?;
let mut q = lk_query(None);
let create_after = now().saturating_sub(max_age);
q = lk_query_push(q, "group", "=", &*PRIVATE)?;
q = lk_query_push(q, "domain", "=", &*domain)?;
q = lk_query_push(q, "create", ">", &*create_after)?;
q = lk_query_push(q, "prefix", "=", path.spath_bytes())?;
Ok(q)
}
pub fn lk_status_poll(lk:&Linkspace,status:LkStatus, d_timeout:Stamp, mut cb: impl PktHandler + 'static,watch_id:Option<&[u8]>) -> LkResult<()>{
let span = debug_span!("status_poll",?status,?d_timeout);
let _ = span.enter();
let mut ok = false;
let mut last_request = Stamp::ZERO;
let mut query : Query= lk_status_overwatch(status, d_timeout)?;
lk_get_all(lk, &query, &mut |pkt| {
if pkt.get_links().is_empty() && pkt.data().is_empty() {
last_request = *pkt.get_create_stamp();
tracing::debug!(pkt=%pkt_fmt(&pkt),"recently requested");
true
}else {
ok =true;
let cnt = (cb).handle_pkt(pkt,lk);
tracing::debug!("recently replied");
cnt.is_continue()
}
})?;
if last_request == Stamp::ZERO{
tracing::debug!("creating new req");
let req = lk_status_request(status).unwrap();
last_request = *req.get_create_stamp();
lk_save(lk,&req)?;
}
let wait_until = last_request.saturating_add(d_timeout);
tracing::debug!(?wait_until,"Waiting until");
query = lk_query_push(query, "data_size", ">", &*U16::ZERO)?;
query = lk_query_push(query, "links_len", ">", &*U16::ZERO)?;
query = lk_query_push(query, "recv", "<", &*wait_until)?;
query = match watch_id{
Some(id) => lk_query_push(query, "", "id", id)?,
None => {
let id = [b"status" as &[u8],&*now()].concat();
lk_query_push(query, "", "id", &id)?
},
};
lk_watch2(lk, &query, cb,span)?;
Ok(())
}
fn is_status_reply(status:LkStatus,path:&IPath,pkt:&NetPktPtr) -> LkResult<()>{
anyhow::ensure!(*pkt.get_domain() == status.domain
&& *pkt.get_group() == PRIVATE
&& pkt.get_ipath() == path
&& !pkt.get_links().is_empty()
&& !pkt.data().is_empty()
,"invalid status update");
Ok(())
}
pub fn lk_status_set(lk:&Linkspace,status:LkStatus,mut update:impl FnMut(&Linkspace,Domain,GroupID,&IPath,Link) -> LkResult<NetPktBox> +'static)-> LkResult<()>{
let span = debug_span!("status_set",?status);
let _ = span.enter();
let LkStatus { domain, group, objtype, instance }= status;
let objtype = objtype.to_vec();
let instance = instance.or(Some(b"default")).map(Vec::from);
let status = LkStatus { instance: instance.as_deref(), domain , group, objtype:&objtype};
let path = lk_status_path(status)?;
let link = Link{tag:ab(b"init"),ptr:PRIVATE};
let initpkt = update(lk,status.domain, PRIVATE, &path,link)?;
is_status_reply(status, &path, &initpkt)?;
let mut prev = initpkt.hash();
tracing::debug!(?initpkt,"init status");
lk_save(&lk,&initpkt )?;
std::mem::drop(initpkt);
let mut q = lk_query(None);
let prefix = lk_status_path(LkStatus { instance:None, ..status})?;
q = lk_query_push(q, "data_size", "=", &*U16::ZERO)?;
q = lk_query_push(q, "links_len", "=", &*U16::ZERO)?;
q = lk_query_push(q, "prefix", "=", prefix.spath_bytes())?;
q = lk_query_push(q, "i_index", "<", &*U32::ZERO)?;
q = lk_query_push(q, "", "id", &[b"status-update" as &[u8],&*now()].concat())?;
lk_watch2(&lk, &q, cb(move |pkt:&dyn NetPkt, lk:&Linkspace| -> LkResult<()>{
let status = LkStatus { instance: instance.as_deref(), domain , group, objtype:&objtype};
let p = pkt.get_ipath();
if p.len() == path.len() && p.spath() != path.as_ref() { return Ok(())}
let link = Link{tag:ab(b"prev"),ptr:prev};
let reply = update(lk,status.domain,PRIVATE,&path,link)?;
is_status_reply(status, &path, &reply)?;
prev = reply.hash();
tracing::debug!(?reply,"Reply status");
lk_save(lk,&reply)?;
Ok(())
}),span)?;
Ok(())
}