use crate::api::ContextApi;
use crate::context::generation::contracts::{
GeneratedMetadataBuilder, GenerationOrchestrationRequest,
};
use crate::context::generation::orchestration::execute_generation_request;
use crate::error::ApiError;
use crate::metadata::frame_types::FrameMetadata;
use crate::metadata::frame_write_contract::build_generated_metadata;
use crate::telemetry::{
ProgressRuntime, ProviderLifecycleEventData, QueueEventData, QueueStatsEventData,
};
use crate::types::{FrameID, NodeID};
use hex;
use parking_lot::RwLock;
use serde_json::json;
use std::collections::{BinaryHeap, HashMap};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{oneshot, Mutex, Notify, Semaphore};
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Priority {
Low = 0, Normal = 1, High = 2, Urgent = 3, }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RequestId(u64);
impl RequestId {
pub fn next() -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
RequestId(COUNTER.fetch_add(1, Ordering::Relaxed))
}
pub fn as_u64(self) -> u64 {
self.0
}
}
#[derive(Clone)]
pub struct QueueEventContext {
pub session_id: String,
pub progress: Arc<ProgressRuntime>,
}
#[derive(Debug, Clone, Default)]
pub struct GenerationRequestOptions {
pub force: bool,
pub plan_id: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct RequestIdentity {
node_id: NodeID,
agent_id: String,
frame_type: String,
}
impl RequestIdentity {
fn new(node_id: NodeID, agent_id: &str, frame_type: &str) -> Self {
Self {
node_id,
agent_id: agent_id.to_string(),
frame_type: frame_type.to_string(),
}
}
fn from_request(request: &GenerationRequest) -> Self {
Self::new(request.node_id, &request.agent_id, &request.frame_type)
}
}
#[derive(Debug)]
struct DedupeEntry {
request_id: RequestId,
waiters: Vec<oneshot::Sender<Result<FrameID, ApiError>>>,
}
impl DedupeEntry {
fn new(request_id: RequestId) -> Self {
Self {
request_id,
waiters: Vec::new(),
}
}
}
#[derive(Debug)]
pub struct GenerationRequest {
pub request_id: RequestId,
pub node_id: NodeID,
pub agent_id: String,
pub provider_name: String,
pub frame_type: String,
pub priority: Priority,
pub retry_count: usize,
pub created_at: Instant,
pub completion_tx: Option<oneshot::Sender<Result<FrameID, ApiError>>>,
pub options: GenerationRequestOptions,
}
impl Clone for GenerationRequest {
fn clone(&self) -> Self {
Self {
request_id: self.request_id,
node_id: self.node_id,
agent_id: self.agent_id.clone(),
provider_name: self.provider_name.clone(),
frame_type: self.frame_type.clone(),
priority: self.priority,
retry_count: self.retry_count,
created_at: self.created_at,
completion_tx: None, options: self.options.clone(),
}
}
}
impl PartialEq for GenerationRequest {
fn eq(&self, other: &Self) -> bool {
self.request_id == other.request_id
}
}
impl Eq for GenerationRequest {}
impl Ord for GenerationRequest {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
let self_plan_rank = if self.options.plan_id.is_some() { 1 } else { 0 };
let other_plan_rank = if other.options.plan_id.is_some() {
1
} else {
0
};
match self_plan_rank.cmp(&other_plan_rank) {
std::cmp::Ordering::Equal => {}
ordering => return ordering,
}
match self.priority.cmp(&other.priority) {
std::cmp::Ordering::Equal => {
self.created_at.cmp(&other.created_at).reverse()
}
ordering => ordering,
}
}
}
impl PartialOrd for GenerationRequest {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone)]
pub struct GenerationConfig {
pub max_concurrent_per_agent: usize,
pub batch_size: usize,
pub max_retry_attempts: usize,
pub retry_delay_ms: u64,
pub rate_limit_ms: Option<u64>,
pub max_queue_size: usize,
pub workers_per_agent: usize,
}
impl Default for GenerationConfig {
fn default() -> Self {
Self {
max_concurrent_per_agent: 3,
batch_size: 50,
max_retry_attempts: 3,
retry_delay_ms: 1000,
rate_limit_ms: Some(100), max_queue_size: 10000,
workers_per_agent: 2,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct QueueStats {
pub pending: usize,
pub processing: usize,
pub completed: usize,
pub failed: usize,
}
struct AgentRateLimiter {
semaphore: Arc<Semaphore>,
last_request: Arc<RwLock<HashMap<String, Instant>>>,
min_delay: Option<Duration>,
}
impl AgentRateLimiter {
fn new(max_concurrent: usize, min_delay_ms: Option<u64>) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
last_request: Arc::new(RwLock::new(HashMap::new())),
min_delay: min_delay_ms.map(Duration::from_millis),
}
}
async fn acquire(&self, agent_id: &str) -> Result<tokio::sync::SemaphorePermit<'_>, ApiError> {
let permit = self
.semaphore
.acquire()
.await
.map_err(|_| ApiError::ProviderRateLimit("Semaphore closed".to_string()))?;
if let Some(min_delay) = self.min_delay {
let sleep_duration = {
let last = self.last_request.read();
if let Some(last_time) = last.get(agent_id) {
let elapsed = last_time.elapsed();
if elapsed < min_delay {
Some(min_delay - elapsed)
} else {
None
}
} else {
None
}
};
if let Some(duration) = sleep_duration {
sleep(duration).await;
}
{
let mut last = self.last_request.write();
last.insert(agent_id.to_string(), Instant::now());
}
}
Ok(permit)
}
}
pub struct FrameGenerationQueue {
queue: Arc<Mutex<BinaryHeap<GenerationRequest>>>,
notify: Arc<Notify>,
workers: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
config: GenerationConfig,
api: Arc<ContextApi>,
rate_limiters: Arc<RwLock<HashMap<String, AgentRateLimiter>>>,
running: Arc<RwLock<bool>>,
stats: Arc<RwLock<QueueStats>>,
event_context: Option<QueueEventContext>,
dedupe_index: Arc<Mutex<HashMap<RequestIdentity, DedupeEntry>>>,
metadata_builder: Arc<GeneratedMetadataBuilder>,
}
impl FrameGenerationQueue {
pub fn new(api: Arc<ContextApi>, config: GenerationConfig) -> Self {
Self::with_event_context(api, config, None)
}
pub fn with_event_context(
api: Arc<ContextApi>,
config: GenerationConfig,
event_context: Option<QueueEventContext>,
) -> Self {
Self::with_custom_metadata_builder(api, config, event_context, build_generated_metadata)
}
pub fn with_custom_metadata_builder<F>(
api: Arc<ContextApi>,
config: GenerationConfig,
event_context: Option<QueueEventContext>,
metadata_builder: F,
) -> Self
where
F: Fn(&str, &str, &str, &str, &str) -> FrameMetadata + Send + Sync + 'static,
{
Self {
queue: Arc::new(Mutex::new(BinaryHeap::new())),
notify: Arc::new(Notify::new()),
workers: Arc::new(RwLock::new(Vec::new())),
config,
api,
rate_limiters: Arc::new(RwLock::new(HashMap::new())),
running: Arc::new(RwLock::new(false)),
stats: Arc::new(RwLock::new(QueueStats::default())),
event_context,
dedupe_index: Arc::new(Mutex::new(HashMap::new())),
metadata_builder: Arc::new(metadata_builder),
}
}
pub async fn enqueue(
&self,
node_id: NodeID,
agent_id: String,
provider_name: String,
frame_type: Option<String>,
priority: Priority,
) -> Result<RequestId, ApiError> {
let mut queue = self.queue.lock().await;
let mut dedupe = self.dedupe_index.lock().await;
let resolved_frame_type = frame_type
.clone()
.unwrap_or_else(|| format!("context-{}", agent_id));
let identity = RequestIdentity::new(node_id, &agent_id, &resolved_frame_type);
if let Some(existing_entry) = dedupe.get(&identity) {
let existing_id = existing_entry.request_id;
self.emit_queue_event(
"request_deduplicated",
QueueEventData {
node_id: hex::encode(node_id),
agent_id,
provider_name,
frame_type: resolved_frame_type,
request_id: Some(existing_id.as_u64()),
retry_count: None,
duration_ms: None,
},
);
return Ok(existing_id);
}
if queue.len() >= self.config.max_queue_size {
warn!(
queue_size = queue.len(),
max_size = self.config.max_queue_size,
"Generation queue is full, dropping request"
);
return Err(ApiError::ConfigError(
"Generation queue is full".to_string(),
));
}
let request_id = RequestId::next();
let frame_type = resolved_frame_type;
let request = GenerationRequest {
request_id,
node_id,
agent_id: agent_id.clone(),
provider_name: provider_name.clone(),
frame_type: frame_type.clone(),
priority,
retry_count: 0,
created_at: Instant::now(),
completion_tx: None,
options: GenerationRequestOptions::default(),
};
queue.push(request);
dedupe.insert(identity, DedupeEntry::new(request_id));
{
let mut stats = self.stats.write();
stats.pending += 1;
}
self.emit_queue_stats_event();
self.notify.notify_one();
let queue_size = queue.len();
drop(dedupe);
drop(queue);
debug!(
request_id = ?request_id,
node_id = %hex::encode(node_id),
agent_id = %agent_id,
provider_name = %provider_name,
priority = ?priority,
queue_size = queue_size,
"Enqueued generation request"
);
self.emit_queue_event(
"request_enqueued",
QueueEventData {
node_id: hex::encode(node_id),
agent_id: agent_id.clone(),
provider_name: provider_name.clone(),
frame_type: frame_type.clone(),
request_id: Some(request_id.as_u64()),
retry_count: Some(0),
duration_ms: None,
},
);
Ok(request_id)
}
pub async fn enqueue_and_wait(
&self,
node_id: NodeID,
agent_id: String,
provider_name: String,
frame_type: Option<String>,
priority: Priority,
timeout: Option<Duration>,
) -> Result<FrameID, ApiError> {
self.enqueue_and_wait_with_options(
node_id,
agent_id,
provider_name,
frame_type,
priority,
timeout,
GenerationRequestOptions::default(),
)
.await
}
pub async fn enqueue_and_wait_with_options(
&self,
node_id: NodeID,
agent_id: String,
provider_name: String,
frame_type: Option<String>,
priority: Priority,
timeout: Option<Duration>,
options: GenerationRequestOptions,
) -> Result<FrameID, ApiError> {
let (tx, rx) = oneshot::channel();
let mut queue = self.queue.lock().await;
let mut dedupe = self.dedupe_index.lock().await;
let resolved_frame_type = frame_type.unwrap_or_else(|| format!("context-{}", agent_id));
let identity = RequestIdentity::new(node_id, &agent_id, &resolved_frame_type);
if let Some(existing_entry) = dedupe.get_mut(&identity) {
existing_entry.waiters.push(tx);
let existing_id = existing_entry.request_id;
drop(dedupe);
drop(queue);
self.emit_queue_event(
"request_deduplicated",
QueueEventData {
node_id: hex::encode(node_id),
agent_id,
provider_name,
frame_type: resolved_frame_type,
request_id: Some(existing_id.as_u64()),
retry_count: None,
duration_ms: None,
},
);
return self.wait_for_generation_completion(rx, timeout).await;
}
if !options.force {
if let Some(existing_head) = self.api.get_head(&node_id, &resolved_frame_type)? {
drop(dedupe);
drop(queue);
self.emit_queue_event(
"request_deduplicated",
QueueEventData {
node_id: hex::encode(node_id),
agent_id,
provider_name,
frame_type: resolved_frame_type,
request_id: None,
retry_count: None,
duration_ms: None,
},
);
return Ok(existing_head);
}
}
if queue.len() >= self.config.max_queue_size {
warn!(
queue_size = queue.len(),
max_size = self.config.max_queue_size,
"Generation queue is full, dropping request"
);
return Err(ApiError::ConfigError(
"Generation queue is full".to_string(),
));
}
let request_id = RequestId::next();
let frame_type = resolved_frame_type;
let request = GenerationRequest {
request_id,
node_id,
agent_id: agent_id.clone(),
provider_name: provider_name.clone(),
frame_type: frame_type.clone(),
priority,
retry_count: 0,
created_at: Instant::now(),
completion_tx: None,
options,
};
queue.push(request);
let mut entry = DedupeEntry::new(request_id);
entry.waiters.push(tx);
dedupe.insert(identity, entry);
{
let mut stats = self.stats.write();
stats.pending += 1;
}
self.emit_queue_stats_event();
self.notify.notify_one();
drop(dedupe);
drop(queue);
debug!(
request_id = ?request_id,
node_id = %hex::encode(node_id),
agent_id = %agent_id,
provider_name = %provider_name,
priority = ?priority,
"Enqueued sync generation request"
);
self.emit_queue_event(
"request_enqueued",
QueueEventData {
node_id: hex::encode(node_id),
agent_id: agent_id.clone(),
provider_name: provider_name.clone(),
frame_type: frame_type.clone(),
request_id: Some(request_id.as_u64()),
retry_count: Some(0),
duration_ms: None,
},
);
self.wait_for_generation_completion(rx, timeout).await
}
pub async fn enqueue_batch(
&self,
requests: Vec<(NodeID, String, String, Option<String>, Priority)>,
) -> Result<Vec<RequestId>, ApiError> {
let mut queue = self.queue.lock().await;
let mut dedupe = self.dedupe_index.lock().await;
let mut request_ids: Vec<RequestId> = Vec::new();
let mut new_requests = Vec::new();
let mut staged = HashMap::new();
let mut enqueue_events = Vec::new();
for (node_id, agent_id, provider_name, frame_type, priority) in requests {
let frame_type = frame_type.unwrap_or_else(|| format!("context-{}", agent_id));
let identity = RequestIdentity::new(node_id, &agent_id, &frame_type);
if let Some(existing_id) = staged.get(&identity) {
request_ids.push(*existing_id);
self.emit_queue_event(
"request_deduplicated",
QueueEventData {
node_id: hex::encode(node_id),
agent_id,
provider_name,
frame_type,
request_id: Some(existing_id.as_u64()),
retry_count: None,
duration_ms: None,
},
);
continue;
}
if let Some(existing_entry) = dedupe.get(&identity) {
request_ids.push(existing_entry.request_id);
self.emit_queue_event(
"request_deduplicated",
QueueEventData {
node_id: hex::encode(node_id),
agent_id,
provider_name,
frame_type,
request_id: Some(existing_entry.request_id.as_u64()),
retry_count: None,
duration_ms: None,
},
);
continue;
}
let request_id = RequestId::next();
let request = GenerationRequest {
request_id,
node_id,
agent_id: agent_id.clone(),
provider_name: provider_name.clone(),
frame_type: frame_type.clone(),
priority,
retry_count: 0,
created_at: Instant::now(),
completion_tx: None,
options: GenerationRequestOptions::default(),
};
request_ids.push(request_id);
staged.insert(identity.clone(), request_id);
new_requests.push((identity, request));
}
if queue.len() + new_requests.len() > self.config.max_queue_size {
warn!(
queue_size = queue.len(),
batch_size = new_requests.len(),
max_size = self.config.max_queue_size,
"Batch would exceed queue size limit"
);
return Err(ApiError::ConfigError(
"Batch would exceed generation queue size limit".to_string(),
));
}
let new_count = new_requests.len();
for (identity, request) in new_requests {
let request_id = request.request_id;
enqueue_events.push(QueueEventData {
node_id: hex::encode(request.node_id),
agent_id: request.agent_id.clone(),
provider_name: request.provider_name.clone(),
frame_type: request.frame_type.clone(),
request_id: Some(request_id.as_u64()),
retry_count: Some(request.retry_count),
duration_ms: None,
});
queue.push(request);
dedupe.insert(identity, DedupeEntry::new(request_id));
}
let batch_size = new_count;
{
let mut stats = self.stats.write();
stats.pending += batch_size;
}
self.emit_queue_stats_event();
let notify_count = batch_size.min(self.config.workers_per_agent);
for _ in 0..notify_count {
self.notify.notify_one();
}
drop(dedupe);
drop(queue);
debug!(
batch_size = batch_size,
"Enqueued batch of generation requests"
);
for payload in enqueue_events {
self.emit_queue_event("request_enqueued", payload);
}
Ok(request_ids)
}
pub fn start(&self) -> Result<(), ApiError> {
let mut running = self.running.write();
if *running {
return Ok(()); }
*running = true;
drop(running);
let worker_count = self.config.workers_per_agent;
let mut workers = self.workers.write();
for i in 0..worker_count {
let queue = Arc::clone(&self.queue);
let notify = Arc::clone(&self.notify);
let api = Arc::clone(&self.api);
let config = self.config.clone();
let rate_limiters = Arc::clone(&self.rate_limiters);
let running = Arc::clone(&self.running);
let stats = Arc::clone(&self.stats);
let event_context = self.event_context.clone();
let dedupe_index = Arc::clone(&self.dedupe_index);
let metadata_builder = Arc::clone(&self.metadata_builder);
let handle = tokio::spawn(async move {
Self::worker_loop(
i,
queue,
notify,
api,
config,
rate_limiters,
running,
stats,
event_context,
dedupe_index,
metadata_builder,
)
.await;
});
workers.push(handle);
}
info!(
worker_count = workers.len(),
"Started frame generation queue workers"
);
Ok(())
}
pub async fn stop(&self) -> Result<(), ApiError> {
let mut running = self.running.write();
if !*running {
return Ok(()); }
*running = false;
drop(running);
let workers = std::mem::take(&mut *self.workers.write());
for handle in workers {
let _ = handle.await;
}
info!("Stopped frame generation queue workers");
Ok(())
}
pub fn stats(&self) -> QueueStats {
self.stats.read().clone()
}
pub async fn wait_for_completion(&self, timeout: Option<Duration>) -> Result<(), ApiError> {
let start = Instant::now();
loop {
let queue = self.queue.lock().await;
let stats = self.stats.read();
if queue.is_empty() && stats.processing == 0 {
return Ok(());
}
if let Some(timeout) = timeout {
if start.elapsed() >= timeout {
return Err(ApiError::ConfigError(
"Timeout waiting for queue to drain".to_string(),
));
}
}
drop(queue);
drop(stats);
sleep(Duration::from_millis(100)).await;
}
}
async fn wait_for_generation_completion(
&self,
receiver: oneshot::Receiver<Result<FrameID, ApiError>>,
timeout: Option<Duration>,
) -> Result<FrameID, ApiError> {
match timeout {
Some(timeout) => tokio::time::timeout(timeout, receiver)
.await
.map_err(|_| ApiError::ConfigError("Timeout waiting for generation".to_string()))?
.map_err(|_| ApiError::ConfigError("Completion channel closed".to_string()))?,
None => receiver
.await
.map_err(|_| ApiError::ConfigError("Completion channel closed".to_string()))?,
}
}
async fn worker_loop(
worker_id: usize,
queue: Arc<Mutex<BinaryHeap<GenerationRequest>>>,
notify: Arc<Notify>,
api: Arc<ContextApi>,
config: GenerationConfig,
rate_limiters: Arc<RwLock<HashMap<String, AgentRateLimiter>>>,
running: Arc<RwLock<bool>>,
stats: Arc<RwLock<QueueStats>>,
event_context: Option<QueueEventContext>,
dedupe_index: Arc<Mutex<HashMap<RequestIdentity, DedupeEntry>>>,
metadata_builder: Arc<GeneratedMetadataBuilder>,
) {
debug!(worker_id, "Worker started");
while *running.read() {
let request = {
let mut queue_guard = queue.lock().await;
queue_guard.pop()
};
let Some(mut request) = request else {
let notify_future = notify.notified();
let timeout_future = sleep(Duration::from_millis(100));
tokio::select! {
_ = notify_future => {
continue;
}
_ = timeout_future => {
continue;
}
}
};
{
let mut stats = stats.write();
stats.pending = stats.pending.saturating_sub(1);
stats.processing += 1;
}
Self::emit_queue_stats_event_static(stats.clone(), event_context.clone());
Self::emit_queue_event_static(
event_context.clone(),
"request_processing",
QueueEventData {
node_id: hex::encode(request.node_id),
agent_id: request.agent_id.clone(),
provider_name: request.provider_name.clone(),
frame_type: request.frame_type.clone(),
request_id: Some(request.request_id.as_u64()),
retry_count: Some(request.retry_count),
duration_ms: None,
},
);
let (semaphore, last_request, min_delay) = {
let mut limiters = rate_limiters.write();
let limiter = limiters.entry(request.agent_id.clone()).or_insert_with(|| {
AgentRateLimiter::new(config.max_concurrent_per_agent, config.rate_limit_ms)
});
(
Arc::clone(&limiter.semaphore),
Arc::clone(&limiter.last_request),
limiter.min_delay,
)
};
let rate_limiter = AgentRateLimiter {
semaphore,
last_request,
min_delay,
};
let request_identity = RequestIdentity::from_request(&request);
let _permit = match rate_limiter.acquire(&request.agent_id).await {
Ok(permit) => permit,
Err(e) => {
error!(
worker_id,
agent_id = %request.agent_id,
error = %e,
"Failed to acquire rate limiter permit"
);
let mut queue_guard = queue.lock().await;
queue_guard.push(request.clone());
{
let mut stats = stats.write();
stats.processing = stats.processing.saturating_sub(1);
stats.pending += 1;
}
continue;
}
};
let result = Self::process_request(
&request,
&api,
&config,
event_context.clone(),
metadata_builder.as_ref(),
)
.await;
let should_retry = {
let mut stats_guard = stats.write();
stats_guard.processing = stats_guard.processing.saturating_sub(1);
match &result {
Ok(_) => {
stats_guard.completed += 1;
false
}
Err(err) => {
let retry = request.retry_count < config.max_retry_attempts
&& Self::is_retryable_error(err);
if retry {
} else {
stats_guard.failed += 1;
error!(
worker_id,
node_id = %hex::encode(request.node_id),
agent_id = %request.agent_id,
retry_count = request.retry_count,
error = %err,
"Generation request failed permanently"
);
}
retry
}
}
};
Self::emit_queue_stats_event_static(stats.clone(), event_context.clone());
if !should_retry {
let waiters = {
let mut dedupe = dedupe_index.lock().await;
dedupe
.remove(&request_identity)
.map(|entry| entry.waiters)
.unwrap_or_default()
};
for tx in waiters {
let _ = tx.send(result.clone());
}
}
if should_retry {
Self::emit_provider_event_static(
event_context.clone(),
"provider_request_retrying",
ProviderLifecycleEventData {
node_id: hex::encode(request.node_id),
agent_id: request.agent_id.clone(),
provider_name: request.provider_name.clone(),
frame_type: request.frame_type.clone(),
duration_ms: None,
error: None,
retry_count: Some(request.retry_count + 1),
},
);
request.retry_count += 1;
sleep(Duration::from_millis(config.retry_delay_ms)).await;
let mut queue_guard = queue.lock().await;
queue_guard.push(request.clone());
drop(queue_guard);
notify.notify_one();
let mut stats_guard = stats.write();
stats_guard.pending += 1;
drop(stats_guard);
Self::emit_queue_stats_event_static(stats.clone(), event_context.clone());
}
}
debug!(worker_id, "Worker stopped");
}
async fn process_request(
request: &GenerationRequest,
api: &ContextApi,
_config: &GenerationConfig,
event_context: Option<QueueEventContext>,
metadata_builder: &GeneratedMetadataBuilder,
) -> Result<FrameID, ApiError> {
let orchestration_request = GenerationOrchestrationRequest {
request_id: request.request_id.as_u64(),
node_id: request.node_id,
agent_id: request.agent_id.clone(),
provider_name: request.provider_name.clone(),
frame_type: request.frame_type.clone(),
retry_count: request.retry_count,
force: request.options.force,
};
execute_generation_request(
&orchestration_request,
api,
metadata_builder,
event_context.as_ref(),
)
.await
}
fn is_retryable_error(error: &ApiError) -> bool {
match error {
ApiError::ConfigError(_) => false, ApiError::MissingPromptContractField { .. } => false,
ApiError::FrameMetadataPolicyViolation(_) => false,
ApiError::FrameMetadataUnknownKey { .. } => false,
ApiError::FrameMetadataForbiddenKey { .. } => false,
ApiError::FrameMetadataPerKeyBudgetExceeded { .. } => false,
ApiError::FrameMetadataTotalBudgetExceeded { .. } => false,
ApiError::ProviderNotConfigured(_) => false,
ApiError::ProviderRateLimit(_) => true,
ApiError::ProviderRequestFailed(_) => true,
ApiError::ProviderError(_) => true,
_ => true, }
}
fn emit_queue_event(&self, event_type: &str, payload: QueueEventData) {
Self::emit_queue_event_static(self.event_context.clone(), event_type, payload);
}
fn emit_queue_event_static(
event_context: Option<QueueEventContext>,
event_type: &str,
payload: QueueEventData,
) {
if let Some(ctx) = event_context {
ctx.progress
.emit_event_best_effort(&ctx.session_id, event_type, json!(payload));
}
}
fn emit_provider_event_static(
event_context: Option<QueueEventContext>,
event_type: &str,
payload: ProviderLifecycleEventData,
) {
if let Some(ctx) = event_context {
ctx.progress
.emit_event_best_effort(&ctx.session_id, event_type, json!(payload));
}
}
fn emit_queue_stats_event(&self) {
Self::emit_queue_stats_event_static(self.stats.clone(), self.event_context.clone());
}
fn emit_queue_stats_event_static(
stats: Arc<RwLock<QueueStats>>,
event_context: Option<QueueEventContext>,
) {
if let Some(ctx) = event_context {
let snapshot = stats.read().clone();
ctx.progress.emit_event_best_effort(
&ctx.session_id,
"queue_stats",
json!(QueueStatsEventData {
pending: snapshot.pending,
processing: snapshot.processing,
completed: snapshot.completed,
failed: snapshot.failed,
}),
);
}
}
}