use aion_core::Event;
use super::nif_context::NifContext;
use super::nif_query::{pending_reply_is_live, take_pending_reply};
use super::nif_state::{EngineNifState, PendingQuery};
pub(super) const QUERY_SENTINEL_PREFIX: &str = "aion_query:";
pub(super) fn take_pending_query_sentinel(state: &EngineNifState, pid: u64) -> Option<String> {
if state.servicing_queries.contains_key(&pid) {
return None;
}
loop {
let pending = {
let mut queue = state.pending_queries.get_mut(&pid)?;
queue.pop_front()?
};
match pending_reply_is_live(state, &pending.query_id) {
Ok(true) => {
state
.servicing_queries
.insert(pid, pending.query_id.clone());
return Some(sentinel_payload(&pending));
}
Ok(false) => {
if let Ok(Some(stale)) = take_pending_reply(state, &pending.query_id) {
drop(stale);
}
}
Err(error) => {
tracing::warn!(
pid,
query_id = %pending.query_id,
error = %error,
"query pump could not inspect the pending reply registry; skipping query"
);
}
}
}
}
pub(super) fn ensure_not_servicing_query(
state: &EngineNifState,
pid: u64,
operation: &str,
) -> Result<(), String> {
match state.servicing_queries.get(&pid) {
Some(entry) => Err(format!(
"query_servicing:{operation} is forbidden while query {} is being serviced; \
query handlers are read-only",
entry.value()
)),
None => Ok(()),
}
}
pub(super) fn clear_servicing_query(state: &EngineNifState, pid: u64, query_id: &str) {
state
.servicing_queries
.remove_if(&pid, |_, servicing| servicing == query_id);
}
pub(super) fn is_mid_replay(context: &NifContext) -> bool {
let handle = context.workflow_handle();
let recorded = recorded_command_counts(context.history());
handle.activity_ordinals_allocated() < recorded.activities
|| handle.timer_ordinals_allocated() < recorded.anonymous_timers
|| handle.child_ordinals_allocated() < recorded.children
}
struct RecordedCommandCounts {
activities: u64,
anonymous_timers: u64,
children: u64,
}
fn recorded_command_counts(history: &[Event]) -> RecordedCommandCounts {
let mut counts = RecordedCommandCounts {
activities: 0,
anonymous_timers: 0,
children: 0,
};
for event in history {
match event {
Event::ActivityScheduled { .. } => counts.activities += 1,
Event::TimerStarted { timer_id, .. } if timer_id.sequence_position().is_some() => {
counts.anonymous_timers += 1;
}
Event::ChildWorkflowStarted { .. } => counts.children += 1,
_ => {}
}
}
counts
}
fn sentinel_payload(pending: &PendingQuery) -> String {
let json = serde_json::json!({
"query_id": pending.query_id,
"name": pending.name,
});
format!("{QUERY_SENTINEL_PREFIX}{json}")
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tokio::sync::oneshot;
use super::super::nif_state::{EngineNifState, PendingAwait, PendingQuery};
use super::{clear_servicing_query, ensure_not_servicing_query, take_pending_query_sentinel};
type TestResult = Result<(), Box<dyn std::error::Error>>;
fn queue_query(
state: &EngineNifState,
pid: u64,
query_id: &str,
name: &str,
) -> Result<oneshot::Receiver<crate::query::QueryResult>, String> {
let (sender, receiver) = oneshot::channel();
crate::runtime::nif_query::insert_pending_reply(state, query_id.to_owned(), pid, sender)?;
state
.pending_queries
.entry(pid)
.or_default()
.push_back(PendingQuery {
query_id: query_id.to_owned(),
name: name.to_owned(),
});
Ok(receiver)
}
#[test]
fn sentinel_pops_one_query_sets_servicing_and_preserves_await_pin() -> TestResult {
let state = Arc::new(EngineNifState::default());
state
.pending_awaits
.insert(7, PendingAwait::Signal { index: 3 });
let _receiver = queue_query(&state, 7, "q-1", "state")?;
let sentinel =
take_pending_query_sentinel(&state, 7).ok_or("expected a sentinel for the query")?;
assert!(sentinel.starts_with("aion_query:"));
let json: serde_json::Value = serde_json::from_str(
sentinel
.strip_prefix("aion_query:")
.ok_or("missing prefix")?,
)?;
assert_eq!(json["query_id"], "q-1");
assert_eq!(json["name"], "state");
assert_eq!(
state.servicing_queries.get(&7).map(|e| e.clone()),
Some("q-1".to_owned())
);
assert!(matches!(
state.pending_awaits.get(&7).map(|e| e.clone()),
Some(PendingAwait::Signal { index: 3 })
));
let _second_receiver = queue_query(&state, 7, "q-2", "state")?;
assert!(take_pending_query_sentinel(&state, 7).is_none());
Ok(())
}
#[test]
fn sentinel_skips_queries_whose_caller_stopped_waiting() -> TestResult {
let state = Arc::new(EngineNifState::default());
let dead_receiver = queue_query(&state, 9, "dead", "state")?;
drop(dead_receiver);
let _live_receiver = queue_query(&state, 9, "live", "state")?;
let sentinel =
take_pending_query_sentinel(&state, 9).ok_or("expected the live query's sentinel")?;
assert!(sentinel.contains("\"query_id\":\"live\""));
assert!(!crate::runtime::nif_query::pending_reply_is_live(
&state, "dead"
)?);
Ok(())
}
#[test]
fn empty_queue_returns_none() {
let state = EngineNifState::default();
assert!(take_pending_query_sentinel(&state, 1).is_none());
}
#[test]
fn servicing_guard_refuses_then_clears() -> TestResult {
let state = Arc::new(EngineNifState::default());
let _receiver = queue_query(&state, 5, "q-9", "state")?;
let _sentinel =
take_pending_query_sentinel(&state, 5).ok_or("expected sentinel before guard")?;
let refused = ensure_not_servicing_query(&state, 5, "dispatch_activity")
.err()
.ok_or("recording during servicing was not refused")?;
assert!(refused.starts_with("query_servicing:dispatch_activity"));
assert!(refused.contains("q-9"));
clear_servicing_query(&state, 5, "other");
assert!(ensure_not_servicing_query(&state, 5, "sleep").is_err());
clear_servicing_query(&state, 5, "q-9");
ensure_not_servicing_query(&state, 5, "sleep").map_err(Box::<dyn std::error::Error>::from)
}
}