use async_graphql::*;
use futures::Stream;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use super::schema::{TripleFilter, ValidationEvent};
use crate::state::{AppState, Event};
pub struct SubscriptionRoot;
#[Subscription]
impl SubscriptionRoot {
async fn triple_added(
&self,
ctx: &Context<'_>,
#[graphql(desc = "Filter criteria for triples")] filter: Option<TripleFilter>,
) -> impl Stream<Item = TripleEvent> {
let state = ctx.data_unchecked::<AppState>();
let rx = state.broadcaster.subscribe();
BroadcastStream::new(rx).filter_map(move |result| {
match result {
Ok(Event::TripleAdded {
hash,
subject,
predicate,
object,
}) => {
if let Some(ref f) = filter {
if let Some(ref s) = f.subject {
if &subject != s {
return None;
}
}
if let Some(ref p) = f.predicate {
if &predicate != p {
return None;
}
}
if let Some(ref sp) = f.subject_prefix {
if !subject.starts_with(sp) {
return None;
}
}
if let Some(ref pp) = f.predicate_prefix {
if !predicate.starts_with(pp) {
return None;
}
}
}
Some(TripleEvent {
event_type: "ADDED".to_string(),
hash,
subject,
predicate,
object: object.to_string(),
timestamp: chrono::Utc::now(),
})
}
_ => None,
}
})
}
async fn triple_deleted(&self, ctx: &Context<'_>) -> impl Stream<Item = TripleDeletionEvent> {
let state = ctx.data_unchecked::<AppState>();
let rx = state.broadcaster.subscribe();
BroadcastStream::new(rx).filter_map(|result| match result {
Ok(Event::TripleDeleted { hash }) => Some(TripleDeletionEvent {
hash,
timestamp: chrono::Utc::now(),
}),
_ => None,
})
}
async fn validation_event(
&self,
ctx: &Context<'_>,
#[graphql(desc = "Only emit valid validation events", default = false)] valid_only: bool,
) -> impl Stream<Item = ValidationEvent> {
let state = ctx.data_unchecked::<AppState>();
let rx = state.broadcaster.subscribe();
BroadcastStream::new(rx).filter_map(move |result| match result {
Ok(Event::ValidationCompleted {
hash,
valid,
proof_hash,
}) => {
if valid_only && !valid {
None
} else {
Some(ValidationEvent {
hash,
valid,
proof_hash,
timestamp: chrono::Utc::now(),
})
}
}
_ => None,
})
}
async fn agent_activity(
&self,
ctx: &Context<'_>,
#[graphql(desc = "Filter by specific agent ID")] agent_id: Option<String>,
) -> impl Stream<Item = AgentActivityEvent> {
let state = ctx.data_unchecked::<AppState>();
let rx = state.broadcaster.subscribe();
BroadcastStream::new(rx).filter_map(move |result| match result {
Ok(Event::TripleAdded {
hash,
subject,
predicate,
..
}) => {
if let Some(ref filter_agent) = agent_id {
if !subject.contains(filter_agent) {
return None;
}
}
Some(AgentActivityEvent {
agent_id: subject.clone(),
action: "ADDED_TRIPLE".to_string(),
triple_hash: hash,
predicate: Some(predicate),
timestamp: chrono::Utc::now(),
})
}
Ok(Event::TripleDeleted { hash }) => Some(AgentActivityEvent {
agent_id: "system".to_string(),
action: "DELETED_TRIPLE".to_string(),
triple_hash: hash,
predicate: None,
timestamp: chrono::Utc::now(),
}),
_ => None,
})
}
async fn heartbeat(
&self,
#[graphql(desc = "Interval in seconds", default = 30)] interval_secs: u64,
) -> impl Stream<Item = HeartbeatEvent> {
let interval = std::time::Duration::from_secs(interval_secs.max(5).min(300));
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)).map(|_| {
HeartbeatEvent {
timestamp: chrono::Utc::now(),
server_time: chrono::Utc::now().to_rfc3339(),
}
})
}
async fn events(&self, ctx: &Context<'_>) -> impl Stream<Item = String> {
let state = ctx.data_unchecked::<AppState>();
let rx = state.broadcaster.subscribe();
BroadcastStream::new(rx).filter_map(|result| match result {
Ok(event) => Some(event.to_json()),
_ => None,
})
}
}
#[derive(Debug, Clone, SimpleObject)]
pub struct TripleEvent {
pub event_type: String,
pub hash: String,
pub subject: String,
pub predicate: String,
pub object: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, SimpleObject)]
pub struct TripleDeletionEvent {
pub hash: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, SimpleObject)]
pub struct AgentActivityEvent {
pub agent_id: String,
pub action: String,
pub triple_hash: String,
pub predicate: Option<String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, SimpleObject)]
pub struct HeartbeatEvent {
pub timestamp: chrono::DateTime<chrono::Utc>,
pub server_time: String,
}