use crate::state::AppState;
use aingle_graph::{NodeId, Predicate, Triple, Value};
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Debug)]
pub struct BatchStoreEventsRequest {
pub namespace: String,
pub events: Vec<TraceEventInput>,
}
#[derive(Deserialize, Debug)]
pub struct TraceEventInput {
pub id: String,
pub event_type: String,
pub agent_id: String,
pub timestamp: String,
#[serde(default)]
pub session: Option<String>,
#[serde(default)]
pub parent_event: Option<String>,
#[serde(default)]
pub duration_ms: Option<u64>,
#[serde(default)]
pub fields: std::collections::HashMap<String, String>,
}
#[derive(Serialize, Debug)]
pub struct BatchStoreEventsResponse {
pub stored: usize,
pub triples_created: usize,
}
#[derive(Deserialize, Debug, Default)]
pub struct EventsQuery {
pub agent: Option<String>,
#[serde(rename = "type")]
pub event_type: Option<String>,
pub from: Option<String>,
pub to: Option<String>,
pub limit: Option<usize>,
pub namespace: Option<String>,
}
#[derive(Serialize, Debug)]
pub struct TraceEventOutput {
pub id: String,
pub event_type: String,
pub agent_id: String,
pub timestamp: String,
pub session: Option<String>,
pub parent_event: Option<String>,
pub duration_ms: Option<u64>,
pub fields: std::collections::HashMap<String, String>,
}
#[derive(Serialize, Debug)]
pub struct QueryEventsResponse {
pub events: Vec<TraceEventOutput>,
pub total: usize,
}
#[derive(Serialize, Debug)]
pub struct CausalNode {
pub event_id: String,
pub event_type: String,
pub agent_id: String,
pub timestamp: String,
pub summary: String,
}
#[derive(Serialize, Debug)]
pub struct CausalChainResponse {
pub chain: Vec<CausalNode>,
}
fn value_as_string(v: &Value) -> Option<&str> {
v.as_string()
}
pub async fn batch_store_events(
State(state): State<AppState>,
Json(req): Json<BatchStoreEventsRequest>,
) -> impl IntoResponse {
let ns = &req.namespace;
let graph = state.graph.write().await;
let mut triples_created: usize = 0;
for event in &req.events {
let subj = NodeId::named(format!("{}:event:{}", ns, event.id));
let core = vec![
Triple::new(
subj.clone(),
Predicate::named(format!("{}:event:type", ns)),
Value::literal(&event.event_type),
),
Triple::new(
subj.clone(),
Predicate::named(format!("{}:event:agent", ns)),
Value::node(NodeId::named(format!("{}:agent:{}", ns, event.agent_id))),
),
Triple::new(
subj.clone(),
Predicate::named(format!("{}:event:timestamp", ns)),
Value::literal(&event.timestamp),
),
];
for t in core {
let _ = graph.insert(t);
triples_created += 1;
}
if let Some(ref session) = event.session {
let _ = graph.insert(Triple::new(
subj.clone(),
Predicate::named(format!("{}:event:session", ns)),
Value::node(NodeId::named(format!("{}:session:{}", ns, session))),
));
triples_created += 1;
}
if let Some(ref parent) = event.parent_event {
let _ = graph.insert(Triple::new(
subj.clone(),
Predicate::named(format!("{}:event:parent_event", ns)),
Value::node(NodeId::named(format!("{}:event:{}", ns, parent))),
));
triples_created += 1;
}
if let Some(duration) = event.duration_ms {
let _ = graph.insert(Triple::new(
subj.clone(),
Predicate::named(format!("{}:event:duration_ms", ns)),
Value::literal(duration.to_string()),
));
triples_created += 1;
}
for (key, value) in &event.fields {
let _ = graph.insert(Triple::new(
subj.clone(),
Predicate::named(format!("{}:event:{}", ns, key)),
Value::literal(value),
));
triples_created += 1;
}
}
(
StatusCode::CREATED,
Json(BatchStoreEventsResponse {
stored: req.events.len(),
triples_created,
}),
)
}
pub async fn query_events(
State(state): State<AppState>,
Query(params): Query<EventsQuery>,
) -> impl IntoResponse {
let ns = params.namespace.as_deref().unwrap_or("mayros");
let graph = state.graph.read().await;
let limit = params.limit.unwrap_or(100);
let type_pred = Predicate::named(format!("{}:event:type", ns));
let type_triples = graph
.get_predicate(&type_pred)
.unwrap_or_default();
let mut events: Vec<TraceEventOutput> = Vec::new();
for triple in &type_triples {
let subj = &triple.subject;
let subj_name = subj.as_name().unwrap_or("");
let prefix = format!("{}:event:", ns);
let event_id = subj_name.strip_prefix(&prefix).unwrap_or(subj_name);
let event_type = value_as_string(&triple.object)
.unwrap_or("unknown")
.to_string();
if let Some(ref filter_type) = params.event_type {
if &event_type != filter_type {
continue;
}
}
let all_triples = graph.get_subject(subj).unwrap_or_default();
let agent_pred_str = format!("{}:event:agent", ns);
let agent_id = all_triples
.iter()
.find(|t| t.predicate.as_str() == agent_pred_str)
.and_then(|t| t.object.as_node())
.and_then(|n| n.as_name())
.map(|s| {
let agent_prefix = format!("{}:agent:", ns);
s.strip_prefix(&agent_prefix).unwrap_or(s).to_string()
})
.unwrap_or_default();
if let Some(ref filter_agent) = params.agent {
if &agent_id != filter_agent {
continue;
}
}
let ts_pred_str = format!("{}:event:timestamp", ns);
let timestamp = all_triples
.iter()
.find(|t| t.predicate.as_str() == ts_pred_str)
.and_then(|t| value_as_string(&t.object))
.unwrap_or("")
.to_string();
if let Some(ref from) = params.from {
if timestamp < *from {
continue;
}
}
if let Some(ref to) = params.to {
if timestamp > *to {
continue;
}
}
let session_pred_str = format!("{}:event:session", ns);
let session = all_triples
.iter()
.find(|t| t.predicate.as_str() == session_pred_str)
.and_then(|t| t.object.as_node())
.and_then(|n| n.as_name())
.map(|s| {
let session_prefix = format!("{}:session:", ns);
s.strip_prefix(&session_prefix).unwrap_or(s).to_string()
});
let parent_pred_str = format!("{}:event:parent_event", ns);
let parent_event = all_triples
.iter()
.find(|t| t.predicate.as_str() == parent_pred_str)
.and_then(|t| t.object.as_node())
.and_then(|n| n.as_name())
.map(|s| {
let event_prefix = format!("{}:event:", ns);
s.strip_prefix(&event_prefix).unwrap_or(s).to_string()
});
let dur_pred_str = format!("{}:event:duration_ms", ns);
let duration_ms = all_triples
.iter()
.find(|t| t.predicate.as_str() == dur_pred_str)
.and_then(|t| value_as_string(&t.object))
.and_then(|s| s.parse::<u64>().ok());
let core_preds: std::collections::HashSet<&str> = [
agent_pred_str.as_str(),
ts_pred_str.as_str(),
session_pred_str.as_str(),
parent_pred_str.as_str(),
dur_pred_str.as_str(),
type_pred.as_str(),
]
.into_iter()
.collect();
let event_pred_prefix = format!("{}:event:", ns);
let mut fields = std::collections::HashMap::new();
for t in &all_triples {
let pred_str = t.predicate.as_str();
if !core_preds.contains(pred_str) {
if let Some(key) = pred_str.strip_prefix(&event_pred_prefix) {
if let Some(val) = value_as_string(&t.object) {
fields.insert(key.to_string(), val.to_string());
}
}
}
}
events.push(TraceEventOutput {
id: event_id.to_string(),
event_type,
agent_id,
timestamp,
session,
parent_event,
duration_ms,
fields,
});
if events.len() >= limit {
break;
}
}
let total = events.len();
Json(QueryEventsResponse { events, total })
}
pub async fn get_causal_chain(
State(state): State<AppState>,
Path(event_id): Path<String>,
Query(params): Query<EventsQuery>,
) -> impl IntoResponse {
let ns = params.namespace.as_deref().unwrap_or("mayros");
let graph = state.graph.read().await;
let mut chain: Vec<CausalNode> = Vec::new();
let mut current_id = event_id;
let agent_prefix = format!("{}:agent:", ns);
let event_prefix = format!("{}:event:", ns);
for _ in 0..50 {
let subj = NodeId::named(format!("{}:event:{}", ns, current_id));
let all_triples = graph.get_subject(&subj).unwrap_or_default();
if all_triples.is_empty() {
break; }
let type_pred_str = format!("{}:event:type", ns);
let event_type = all_triples
.iter()
.find(|t| t.predicate.as_str() == type_pred_str)
.and_then(|t| value_as_string(&t.object))
.unwrap_or("unknown")
.to_string();
let agent_pred_str = format!("{}:event:agent", ns);
let agent_id = all_triples
.iter()
.find(|t| t.predicate.as_str() == agent_pred_str)
.and_then(|t| t.object.as_node())
.and_then(|n| n.as_name())
.map(|s| s.strip_prefix(&agent_prefix).unwrap_or(s).to_string())
.unwrap_or_default();
let ts_pred_str = format!("{}:event:timestamp", ns);
let timestamp = all_triples
.iter()
.find(|t| t.predicate.as_str() == ts_pred_str)
.and_then(|t| value_as_string(&t.object))
.unwrap_or("")
.to_string();
chain.push(CausalNode {
event_id: current_id.clone(),
event_type: event_type.clone(),
agent_id,
timestamp,
summary: format!("{} event", event_type),
});
let parent_pred_str = format!("{}:event:parent_event", ns);
match all_triples
.iter()
.find(|t| t.predicate.as_str() == parent_pred_str)
.and_then(|t| t.object.as_node())
.and_then(|n| n.as_name())
.map(|s| s.strip_prefix(&event_prefix).unwrap_or(s).to_string())
{
Some(parent_id) => current_id = parent_id,
None => break,
}
}
chain.reverse();
Json(CausalChainResponse { chain })
}
pub fn observability_router() -> axum::Router<AppState> {
axum::Router::new()
.route("/api/v1/events", axum::routing::post(batch_store_events))
.route("/api/v1/events", axum::routing::get(query_events))
.route(
"/api/v1/events/{id}/chain",
axum::routing::get(get_causal_chain),
)
}