use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use super::summarizer::SummaryAnnouncement;
use super::AggregatorDaemon;
use crate::adapter::net::cortex::rpc::{
RpcContext, RpcHandler, RpcHandlerError, RpcResponsePayload, RpcStatus,
};
pub const FOLD_QUERY_SERVICE: &str = "fold.query";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FoldQueryRequest {
pub kind: u16,
pub op: FoldQueryOp,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum FoldQueryOp {
LatestSummary,
SummarizeNow,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum FoldQueryResponse {
Summaries {
kind: u16,
summaries: Vec<SummaryAnnouncement>,
},
Error(FoldQueryError),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum FoldQueryError {
UnknownKind {
kind: u16,
},
DecodeFailed(String),
}
pub struct FoldQueryHandler {
aggregator: Arc<AggregatorDaemon>,
}
impl FoldQueryHandler {
pub fn new(aggregator: Arc<AggregatorDaemon>) -> Self {
Self { aggregator }
}
}
#[async_trait]
impl RpcHandler for FoldQueryHandler {
async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
let request: FoldQueryRequest = match postcard::from_bytes(&ctx.payload.body) {
Ok(req) => req,
Err(e) => {
let response =
FoldQueryResponse::Error(FoldQueryError::DecodeFailed(e.to_string()));
return Ok(encode_response(&response));
}
};
let response = answer(&self.aggregator, &request);
Ok(encode_response(&response))
}
}
pub(crate) fn answer(
aggregator: &Arc<AggregatorDaemon>,
request: &FoldQueryRequest,
) -> FoldQueryResponse {
let configured = aggregator.config().fold_kinds.contains(&request.kind);
if !configured {
return FoldQueryResponse::Error(FoldQueryError::UnknownKind { kind: request.kind });
}
let summaries: Vec<SummaryAnnouncement> = match request.op {
FoldQueryOp::LatestSummary => aggregator
.latest_summaries()
.into_iter()
.filter(|s| s.fold_kind == request.kind)
.collect(),
FoldQueryOp::SummarizeNow => {
let fresh: Vec<SummaryAnnouncement> = aggregator
.tick_once()
.into_iter()
.filter(|s| s.fold_kind == request.kind)
.collect();
if fresh.is_empty() {
aggregator
.latest_summaries()
.into_iter()
.filter(|s| s.fold_kind == request.kind)
.collect()
} else {
fresh
}
}
};
FoldQueryResponse::Summaries {
kind: request.kind,
summaries,
}
}
fn encode_response(response: &FoldQueryResponse) -> RpcResponsePayload {
let body = match postcard::to_allocvec(response) {
Ok(b) => Bytes::from(b),
Err(e) => {
tracing::warn!(
error = %e,
"aggregator: fold.query response encode failed; replying with empty body",
);
Bytes::new()
}
};
RpcResponsePayload {
status: RpcStatus::Ok,
headers: Vec::new(),
body,
}
}
impl AggregatorDaemon {
pub fn query_handler(self: &Arc<Self>) -> FoldQueryHandler {
FoldQueryHandler::new(self.clone())
}
pub fn install_query_service(
self: &Arc<Self>,
mesh: &Arc<crate::adapter::net::MeshNode>,
) -> Result<crate::adapter::net::mesh_rpc::ServeHandle, crate::adapter::net::mesh_rpc::ServeError>
{
mesh.serve_rpc(FOLD_QUERY_SERVICE, Arc::new(self.query_handler()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::aggregator::AggregatorConfig;
use crate::adapter::net::behavior::fold::capability::{CapabilityFold, CapabilityMembership};
use crate::adapter::net::behavior::fold::wire::SignedAnnouncement;
use crate::adapter::net::behavior::fold::{EnvelopeMeta, FoldKind, NodeState};
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::{MeshNode, MeshNodeConfig, SubnetId};
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::time::Duration;
async fn build_mesh() -> Arc<MeshNode> {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let cfg = MeshNodeConfig::new(addr, [0x17u8; 32]);
Arc::new(
MeshNode::new(EntityKeypair::generate(), cfg)
.await
.expect("MeshNode::new"),
)
}
fn sign_cap(
kp: &EntityKeypair,
publisher: u64,
class: u64,
state: NodeState,
) -> SignedAnnouncement<CapabilityMembership> {
SignedAnnouncement::sign(
kp,
CapabilityFold::KIND_ID,
class,
publisher,
1,
EnvelopeMeta::default(),
CapabilityMembership {
class_hash: class,
tags: Vec::new(),
hardware: None,
state,
region: None,
price_quote: None,
reflex_addr: None,
allowed_nodes: Vec::new(),
allowed_subnets: Vec::new(),
allowed_groups: Vec::new(),
metadata: BTreeMap::new(),
},
)
.expect("sign")
}
async fn aggregator_with_idle_publisher() -> Arc<AggregatorDaemon> {
let mesh = build_mesh().await;
let kp = EntityKeypair::generate();
let fold = mesh.capability_fold();
fold.apply(sign_cap(&kp, 0xA, 1, NodeState::Idle)).unwrap();
let cfg = AggregatorConfig::new(SubnetId::new(&[3]))
.with_fold_kind(CapabilityFold::KIND_ID)
.with_interval(Duration::from_secs(60));
Arc::new(AggregatorDaemon::new(cfg, mesh).expect("new"))
}
#[tokio::test]
async fn answer_returns_summaries_for_known_kind() {
let agg = aggregator_with_idle_publisher().await;
agg.tick_once();
let req = FoldQueryRequest {
kind: CapabilityFold::KIND_ID,
op: FoldQueryOp::LatestSummary,
};
let resp = answer(&agg, &req);
match resp {
FoldQueryResponse::Summaries { kind, summaries } => {
assert_eq!(kind, CapabilityFold::KIND_ID);
assert_eq!(summaries.len(), 1);
let idle = summaries[0]
.buckets
.iter()
.find(|(n, _)| n == "idle")
.map(|(_, c)| *c)
.unwrap_or(0);
assert_eq!(idle, 1);
}
other => panic!("expected Summaries, got {other:?}"),
}
}
#[tokio::test]
async fn answer_rejects_unknown_kind() {
let agg = aggregator_with_idle_publisher().await;
let req = FoldQueryRequest {
kind: 0xDEAD,
op: FoldQueryOp::LatestSummary,
};
let resp = answer(&agg, &req);
match resp {
FoldQueryResponse::Error(FoldQueryError::UnknownKind { kind }) => {
assert_eq!(kind, 0xDEAD);
}
other => panic!("expected UnknownKind, got {other:?}"),
}
}
#[tokio::test]
async fn summarize_now_forces_a_fresh_tick_before_answering() {
let agg = aggregator_with_idle_publisher().await;
let gen_before = agg.generation();
let req = FoldQueryRequest {
kind: CapabilityFold::KIND_ID,
op: FoldQueryOp::SummarizeNow,
};
let _ = answer(&agg, &req);
assert_eq!(agg.generation(), gen_before + 1);
}
#[tokio::test]
async fn summarize_now_falls_back_to_latest_when_tick_is_a_noop() {
let agg = aggregator_with_idle_publisher().await;
let req = FoldQueryRequest {
kind: CapabilityFold::KIND_ID,
op: FoldQueryOp::SummarizeNow,
};
let first = answer(&agg, &req);
match first {
FoldQueryResponse::Summaries { ref summaries, .. } => {
assert_eq!(summaries.len(), 1, "first tick produces a novel summary");
}
other => panic!("expected Summaries, got {other:?}"),
}
let second = answer(&agg, &req);
match second {
FoldQueryResponse::Summaries { summaries, .. } => {
assert_eq!(
summaries.len(),
1,
"no-op tick must still return the cached latest summary"
);
}
other => panic!("expected Summaries, got {other:?}"),
}
}
#[test]
fn request_response_round_trips_through_postcard() {
let req = FoldQueryRequest {
kind: CapabilityFold::KIND_ID,
op: FoldQueryOp::SummarizeNow,
};
let bytes = postcard::to_allocvec(&req).expect("encode");
let back: FoldQueryRequest = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(back, req);
let resp = FoldQueryResponse::Summaries {
kind: 0x0001,
summaries: vec![SummaryAnnouncement {
source_subnet: SubnetId::new(&[3, 7]),
fold_kind: 0x0001,
generation: 42,
buckets: vec![("idle".to_string(), 4), ("busy".to_string(), 1)],
}],
};
let bytes = postcard::to_allocvec(&resp).expect("encode");
let back: FoldQueryResponse = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(back, resp);
let err = FoldQueryResponse::Error(FoldQueryError::UnknownKind { kind: 0xDEAD });
let bytes = postcard::to_allocvec(&err).expect("encode");
let back: FoldQueryResponse = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(back, err);
}
}