use std::time::Duration;
use redis::AsyncCommands;
use redis::aio::ConnectionManager;
use tracing::{debug, instrument};
use crate::agent::AgentId;
use crate::error::Result;
use crate::events::{EventStream, TownEvent};
use crate::keys::RedisKeys;
use crate::message::{Message, Priority};
const ACTIVITY_TTL_SECS: u64 = 3600;
const ACTIVITY_MAX_ENTRIES: isize = 10;
#[derive(Clone)]
pub struct Channel {
conn: ConnectionManager,
keys: RedisKeys,
}
impl Channel {
pub fn new(conn: ConnectionManager, town_name: impl Into<String>) -> Self {
let town_name = town_name.into();
Self {
conn,
keys: RedisKeys::new(town_name),
}
}
pub fn town_name(&self) -> &str {
self.keys.town_name()
}
pub fn conn(&self) -> &ConnectionManager {
&self.conn
}
pub fn keys(&self) -> &RedisKeys {
&self.keys
}
fn inbox_key(&self, agent_id: AgentId) -> String {
self.keys.agent_inbox(agent_id)
}
fn urgent_key(&self, agent_id: AgentId) -> String {
self.keys.agent_urgent(agent_id)
}
fn state_key(&self, agent_id: AgentId) -> String {
self.keys.agent_state(agent_id)
}
fn task_key(&self, task_id: crate::task::TaskId) -> String {
self.keys.task(task_id)
}
fn activity_key(&self, agent_id: AgentId) -> String {
self.keys.agent_activity(agent_id)
}
fn stop_key(&self, agent_id: AgentId) -> String {
self.keys.agent_stop(agent_id)
}
fn backlog_key(&self) -> String {
self.keys.backlog()
}
fn docket_tasks_key(&self) -> String {
self.keys.docket_tasks()
}
fn docket_events_key(&self) -> String {
self.keys.docket_events()
}
fn broadcast_channel(&self) -> String {
self.keys.broadcast()
}
fn town_key_pattern(&self) -> String {
self.keys.pattern_all()
}
fn agent_key_pattern(&self) -> String {
self.keys.pattern_agents()
}
fn inbox_key_pattern(&self) -> String {
self.keys.pattern_inboxes()
}
fn stop_key_pattern(&self) -> String {
self.keys.pattern_stops()
}
fn activity_key_pattern(&self) -> String {
self.keys.pattern_activities()
}
fn urgent_key_pattern(&self) -> String {
self.keys.pattern_urgents()
}
fn task_key_pattern(&self) -> String {
self.keys.pattern_tasks()
}
#[instrument(skip(self, message), fields(to = %message.to, msg_type = ?message.msg_type))]
pub async fn send(&self, message: &Message) -> Result<()> {
let mut conn = self.conn.clone();
let inbox_key = self.inbox_key(message.to);
let serialized = serde_json::to_string(message)?;
match message.priority {
Priority::Urgent | Priority::High => {
let _: () = conn.lpush(&inbox_key, &serialized).await?;
}
Priority::Normal | Priority::Low => {
let _: () = conn.rpush(&inbox_key, &serialized).await?;
}
}
debug!("Sent message {} to {}", message.id, message.to);
Ok(())
}
#[instrument(skip(self, message))]
pub async fn send_urgent(&self, message: &Message) -> Result<()> {
let mut conn = self.conn.clone();
let urgent_key = self.urgent_key(message.to);
let data = serde_json::to_string(message)?;
let _: () = conn.lpush(&urgent_key, &data).await?;
debug!("Sent URGENT message {} to {}", message.id, message.to);
Ok(())
}
#[instrument(skip(self))]
pub async fn receive_urgent(&self, agent_id: AgentId) -> Result<Vec<Message>> {
let mut conn = self.conn.clone();
let urgent_key = self.urgent_key(agent_id);
let mut messages = Vec::new();
loop {
let result: Option<String> = conn.lpop(&urgent_key, None).await?;
match result {
Some(data) => {
let message: Message = serde_json::from_str(&data)?;
messages.push(message);
}
None => break,
}
}
if !messages.is_empty() {
debug!(
"Received {} urgent messages for {}",
messages.len(),
agent_id
);
}
Ok(messages)
}
pub async fn urgent_len(&self, agent_id: AgentId) -> Result<usize> {
let mut conn = self.conn.clone();
let urgent_key = self.urgent_key(agent_id);
let len: usize = conn.llen(&urgent_key).await?;
Ok(len)
}
#[instrument(skip(self))]
pub async fn request_stop(&self, agent_id: AgentId) -> Result<()> {
let mut conn = self.conn.clone();
let stop_key = self.stop_key(agent_id);
let _: () = conn.set_ex(&stop_key, "1", 3600).await?;
debug!("Requested stop for agent {}", agent_id);
Ok(())
}
#[instrument(skip(self))]
pub async fn should_stop(&self, agent_id: AgentId) -> Result<bool> {
let mut conn = self.conn.clone();
let stop_key = self.stop_key(agent_id);
let exists: bool = conn.exists(&stop_key).await?;
Ok(exists)
}
pub async fn clear_stop(&self, agent_id: AgentId) -> Result<()> {
let mut conn = self.conn.clone();
let stop_key = self.stop_key(agent_id);
let _: () = conn.del(&stop_key).await?;
Ok(())
}
#[instrument(skip(self))]
pub async fn receive(&self, agent_id: AgentId, timeout: Duration) -> Result<Option<Message>> {
let mut conn = self.conn.clone();
let inbox_key = self.inbox_key(agent_id);
let result: Option<String> = conn.blpop(&inbox_key, timeout.as_secs_f64()).await?;
match result {
Some(data) => {
let message: Message = serde_json::from_str(&data)?;
debug!("Received message {} from inbox", message.id);
Ok(Some(message))
}
None => Ok(None),
}
}
pub async fn try_receive(&self, agent_id: AgentId) -> Result<Option<Message>> {
let mut conn = self.conn.clone();
let inbox_key = self.inbox_key(agent_id);
let result: Option<String> = conn.lpop(&inbox_key, None).await?;
match result {
Some(data) => {
let message: Message = serde_json::from_str(&data)?;
Ok(Some(message))
}
None => Ok(None),
}
}
pub async fn inbox_len(&self, agent_id: AgentId) -> Result<usize> {
let mut conn = self.conn.clone();
let inbox_key = self.inbox_key(agent_id);
let len: usize = conn.llen(&inbox_key).await?;
Ok(len)
}
pub async fn peek_inbox(&self, agent_id: AgentId, count: isize) -> Result<Vec<Message>> {
let mut conn = self.conn.clone();
let inbox_key = self.inbox_key(agent_id);
let items: Vec<String> = conn.lrange(&inbox_key, 0, count - 1).await?;
let mut messages = Vec::new();
for item in items {
if let Ok(msg) = serde_json::from_str::<Message>(&item) {
messages.push(msg);
}
}
Ok(messages)
}
pub async fn broadcast(&self, message: &Message) -> Result<()> {
let mut conn = self.conn.clone();
let serialized = serde_json::to_string(message)?;
let broadcast_channel = self.broadcast_channel();
let _: () = conn.publish(broadcast_channel, &serialized).await?;
Ok(())
}
pub async fn set_agent_state(&self, agent: &crate::agent::Agent) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.state_key(agent.id);
let mut fields: Vec<(String, String)> = vec![
("id".to_string(), agent.id.to_string()),
("name".to_string(), agent.name.clone()),
(
"agent_type".to_string(),
serde_json::to_string(&agent.agent_type)?
.trim_matches('"')
.to_string(),
),
(
"state".to_string(),
serde_json::to_string(&agent.state)?
.trim_matches('"')
.to_string(),
),
("cli".to_string(), agent.cli.clone()),
("created_at".to_string(), agent.created_at.to_rfc3339()),
(
"last_heartbeat".to_string(),
agent.last_heartbeat.to_rfc3339(),
),
(
"tasks_completed".to_string(),
agent.tasks_completed.to_string(),
),
(
"rounds_completed".to_string(),
agent.rounds_completed.to_string(),
),
(
"last_active_at".to_string(),
agent.last_active_at.to_rfc3339(),
),
(
"spawn_mode".to_string(),
serde_json::to_string(&agent.spawn_mode)?
.trim_matches('"')
.to_string(),
),
];
let mut fields_to_delete: Vec<&str> = Vec::new();
if let Some(ref task_id) = agent.current_task {
fields.push(("current_task".to_string(), task_id.to_string()));
} else {
fields_to_delete.push("current_task");
}
if let Some(ref nickname) = agent.nickname {
fields.push(("nickname".to_string(), nickname.clone()));
} else {
fields_to_delete.push("nickname");
}
if let Some(ref role_id) = agent.role_id {
fields.push(("role_id".to_string(), role_id.clone()));
} else {
fields_to_delete.push("role_id");
}
if let Some(ref parent_id) = agent.parent_agent_id {
fields.push(("parent_agent_id".to_string(), parent_id.to_string()));
} else {
fields_to_delete.push("parent_agent_id");
}
let mut pipe = redis::pipe();
if !fields_to_delete.is_empty() {
pipe.hdel(&key, &fields_to_delete);
}
pipe.hset_multiple(&key, &fields);
let _: () = pipe.query_async(&mut conn).await?;
Ok(())
}
pub async fn get_agent_state(&self, agent_id: AgentId) -> Result<Option<crate::agent::Agent>> {
let mut conn = self.conn.clone();
let key = self.state_key(agent_id);
let fields: std::collections::HashMap<String, String> = conn.hgetall(&key).await?;
if fields.is_empty() {
return Ok(None);
}
let agent = Self::parse_agent_from_hash(fields)?;
Ok(Some(agent))
}
fn parse_agent_from_hash(
fields: std::collections::HashMap<String, String>,
) -> Result<crate::agent::Agent> {
use chrono::DateTime;
let id: AgentId = fields
.get("id")
.ok_or_else(|| crate::error::Error::AgentNotFound("Missing id field".to_string()))?
.parse()
.map_err(|e| crate::error::Error::AgentNotFound(format!("Invalid agent id: {}", e)))?;
let name = fields.get("name").cloned().unwrap_or_default();
let agent_type: crate::agent::AgentType = fields
.get("agent_type")
.map(|s| serde_json::from_str(&format!("\"{}\"", s)).unwrap_or_default())
.unwrap_or_default();
let state: crate::agent::AgentState = fields
.get("state")
.map(|s| serde_json::from_str(&format!("\"{}\"", s)).unwrap_or_default())
.unwrap_or_default();
let cli = fields.get("cli").cloned().unwrap_or_default();
let current_task = fields.get("current_task").and_then(|s| s.parse().ok());
let created_at = fields
.get("created_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(chrono::Utc::now);
let last_heartbeat = fields
.get("last_heartbeat")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(chrono::Utc::now);
let tasks_completed = fields
.get("tasks_completed")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let rounds_completed = fields
.get("rounds_completed")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let last_active_at = fields
.get("last_active_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or(created_at);
let nickname = fields.get("nickname").cloned();
let role_id = fields.get("role_id").cloned();
let parent_agent_id = fields.get("parent_agent_id").and_then(|s| s.parse().ok());
let spawn_mode: crate::agent::SpawnMode = fields
.get("spawn_mode")
.map(|s| serde_json::from_str(&format!("\"{}\"", s)).unwrap_or_default())
.unwrap_or_default();
Ok(crate::agent::Agent {
id,
name,
nickname,
role_id,
parent_agent_id,
spawn_mode,
agent_type,
state,
cli,
current_task,
created_at,
last_heartbeat,
tasks_completed,
rounds_completed,
last_active_at,
})
}
pub async fn list_agents(&self) -> Result<Vec<crate::agent::Agent>> {
let mut conn = self.conn.clone();
let pattern = self.agent_key_pattern();
let keys: Vec<String> = redis::cmd("KEYS")
.arg(&pattern)
.query_async(&mut conn)
.await?;
let mut agents = Vec::new();
for key in keys {
let fields: std::collections::HashMap<String, String> = conn.hgetall(&key).await?;
if !fields.is_empty()
&& let Ok(agent) = Self::parse_agent_from_hash(fields)
{
agents.push(agent);
}
}
Ok(agents)
}
pub async fn get_agent_by_name(&self, name: &str) -> Result<Option<crate::agent::Agent>> {
let agents = self.list_agents().await?;
Ok(agents.into_iter().find(|a| a.name == name))
}
pub async fn delete_agent(&self, agent_id: AgentId) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.state_key(agent_id);
let _: () = conn.del(&key).await?;
let inbox_key = self.inbox_key(agent_id);
let urgent_key = self.urgent_key(agent_id);
let activity_key = self.activity_key(agent_id);
let stop_key = self.stop_key(agent_id);
let _: () = conn.del(&inbox_key).await?;
let _: () = conn.del(&urgent_key).await?;
let _: () = conn.del(&activity_key).await?;
let _: () = conn.del(&stop_key).await?;
Ok(())
}
#[allow(dead_code)]
#[instrument(skip(self))]
pub async fn increment_agent_rounds(&self, agent_id: AgentId) -> Result<u64> {
let mut conn = self.conn.clone();
let key = self.state_key(agent_id);
let new_value: i64 = conn.hincr(&key, "rounds_completed", 1).await?;
debug!(
"Agent {} rounds_completed incremented to {}",
agent_id, new_value
);
Ok(new_value as u64)
}
#[allow(dead_code)]
#[instrument(skip(self))]
pub async fn increment_agent_tasks_completed(&self, agent_id: AgentId) -> Result<u64> {
let mut conn = self.conn.clone();
let key = self.state_key(agent_id);
let new_value: i64 = conn.hincr(&key, "tasks_completed", 1).await?;
debug!(
"Agent {} tasks_completed incremented to {}",
agent_id, new_value
);
Ok(new_value as u64)
}
#[allow(dead_code)]
#[instrument(skip(self))]
pub async fn update_agent_heartbeat(&self, agent_id: AgentId) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.state_key(agent_id);
let now = chrono::Utc::now().to_rfc3339();
let _: () = conn.hset(&key, "last_heartbeat", &now).await?;
Ok(())
}
pub async fn set_task(&self, task: &crate::task::Task) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.task_key(task.id);
let mut fields: Vec<(String, String)> = vec![
("id".to_string(), task.id.to_string()),
("description".to_string(), task.description.clone()),
(
"state".to_string(),
serde_json::to_string(&task.state)?
.trim_matches('"')
.to_string(),
),
("created_at".to_string(), task.created_at.to_rfc3339()),
("updated_at".to_string(), task.updated_at.to_rfc3339()),
("tags".to_string(), serde_json::to_string(&task.tags)?),
];
let mut fields_to_delete: Vec<&str> = Vec::new();
if let Some(ref agent_id) = task.assigned_to {
fields.push(("assigned_to".to_string(), agent_id.to_string()));
} else {
fields_to_delete.push("assigned_to");
}
if let Some(ref started_at) = task.started_at {
fields.push(("started_at".to_string(), started_at.to_rfc3339()));
} else {
fields_to_delete.push("started_at");
}
if let Some(ref completed_at) = task.completed_at {
fields.push(("completed_at".to_string(), completed_at.to_rfc3339()));
} else {
fields_to_delete.push("completed_at");
}
if let Some(ref result) = task.result {
fields.push(("result".to_string(), result.clone()));
} else {
fields_to_delete.push("result");
}
if let Some(ref parent_id) = task.parent_id {
fields.push(("parent_id".to_string(), parent_id.to_string()));
} else {
fields_to_delete.push("parent_id");
}
let mut pipe = redis::pipe();
if !fields_to_delete.is_empty() {
pipe.hdel(&key, &fields_to_delete);
}
pipe.hset_multiple(&key, &fields);
let _: () = pipe.query_async(&mut conn).await?;
Ok(())
}
pub async fn get_task(
&self,
task_id: crate::task::TaskId,
) -> Result<Option<crate::task::Task>> {
let mut conn = self.conn.clone();
let key = self.task_key(task_id);
let fields: std::collections::HashMap<String, String> = conn.hgetall(&key).await?;
if fields.is_empty() {
return Ok(None);
}
let task = Self::parse_task_from_hash(fields)?;
Ok(Some(task))
}
pub async fn delete_task(&self, task_id: crate::task::TaskId) -> Result<bool> {
let mut conn = self.conn.clone();
let key = self.task_key(task_id);
let deleted: i64 = conn.del(&key).await?;
if deleted > 0 {
debug!("Deleted task {}", task_id);
}
Ok(deleted > 0)
}
fn parse_task_from_hash(
fields: std::collections::HashMap<String, String>,
) -> Result<crate::task::Task> {
use chrono::DateTime;
let id: crate::task::TaskId = fields
.get("id")
.ok_or_else(|| crate::error::Error::TaskNotFound("Missing id field".to_string()))?
.parse()
.map_err(|e| crate::error::Error::TaskNotFound(format!("Invalid task id: {}", e)))?;
let description = fields.get("description").cloned().unwrap_or_default();
let state: crate::task::TaskState = fields
.get("state")
.map(|s| {
serde_json::from_str(&format!("\"{}\"", s)).unwrap_or_default()
})
.unwrap_or_default();
let created_at = fields
.get("created_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(chrono::Utc::now);
let updated_at = fields
.get("updated_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(chrono::Utc::now);
let assigned_to = fields.get("assigned_to").and_then(|s| s.parse().ok());
let started_at = fields
.get("started_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc));
let completed_at = fields
.get("completed_at")
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc));
let result = fields.get("result").cloned();
let parent_id = fields.get("parent_id").and_then(|s| s.parse().ok());
let tags: Vec<String> = fields
.get("tags")
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_default();
Ok(crate::task::Task {
id,
description,
state,
assigned_to,
created_at,
updated_at,
started_at,
completed_at,
result,
parent_id,
tags,
})
}
pub async fn list_tasks(&self) -> Result<Vec<crate::task::Task>> {
let mut conn = self.conn.clone();
let pattern = self.task_key_pattern();
let mut tasks = Vec::new();
let keys: Vec<String> = redis::cmd("KEYS")
.arg(&pattern)
.query_async(&mut conn)
.await?;
for key in keys {
let fields: std::collections::HashMap<String, String> = conn.hgetall(&key).await?;
if !fields.is_empty()
&& let Ok(task) = Self::parse_task_from_hash(fields)
{
tasks.push(task);
}
}
Ok(tasks)
}
#[instrument(skip(self, activity))]
pub async fn log_agent_activity(&self, agent_id: AgentId, activity: &str) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.activity_key(agent_id);
let _: () = conn.lpush(&key, activity).await?;
let _: () = conn.ltrim(&key, 0, ACTIVITY_MAX_ENTRIES - 1).await?;
let _: () = conn.expire(&key, ACTIVITY_TTL_SECS as i64).await?;
debug!("Logged activity for agent {}", agent_id);
Ok(())
}
#[instrument(skip(self))]
pub async fn get_agent_activity(&self, agent_id: AgentId) -> Result<Option<String>> {
let mut conn = self.conn.clone();
let key = self.activity_key(agent_id);
let entries: Vec<String> = conn.lrange(&key, 0, 4).await?;
if entries.is_empty() {
Ok(None)
} else {
Ok(Some(entries.join("\n")))
}
}
pub fn event_stream(&self) -> EventStream {
EventStream::new(self.conn.clone(), self.town_name())
}
pub async fn emit_event(&self, event: &TownEvent) {
let es = self.event_stream();
if let Err(e) = es.emit(event).await {
debug!("Failed to emit event: {}", e);
}
}
pub async fn backlog_push(&self, task_id: crate::task::TaskId) -> Result<()> {
let mut conn = self.conn.clone();
let backlog_key = self.backlog_key();
let _: () = conn.rpush(&backlog_key, task_id.to_string()).await?;
debug!("Added task {} to backlog", task_id);
Ok(())
}
pub async fn backlog_list(&self) -> Result<Vec<crate::task::TaskId>> {
let mut conn = self.conn.clone();
let backlog_key = self.backlog_key();
let items: Vec<String> = conn.lrange(&backlog_key, 0, -1).await?;
let mut task_ids = Vec::new();
for item in items {
if let Ok(task_id) = item.parse() {
task_ids.push(task_id);
}
}
Ok(task_ids)
}
pub async fn backlog_len(&self) -> Result<usize> {
let mut conn = self.conn.clone();
let backlog_key = self.backlog_key();
let len: usize = conn.llen(&backlog_key).await?;
Ok(len)
}
pub async fn backlog_pop(&self) -> Result<Option<crate::task::TaskId>> {
let mut conn = self.conn.clone();
let backlog_key = self.backlog_key();
let result: Option<String> = conn.lpop(&backlog_key, None).await?;
match result {
Some(id) => {
let task_id = id.parse().map_err(|e| {
crate::error::Error::TaskNotFound(format!("Invalid task ID: {}", e))
})?;
debug!("Popped task {} from backlog", task_id);
Ok(Some(task_id))
}
None => Ok(None),
}
}
pub async fn backlog_remove(&self, task_id: crate::task::TaskId) -> Result<bool> {
let mut conn = self.conn.clone();
let backlog_key = self.backlog_key();
let removed: i64 = conn.lrem(&backlog_key, 1, task_id.to_string()).await?;
if removed > 0 {
debug!("Removed task {} from backlog", task_id);
}
Ok(removed > 0)
}
pub async fn drain_inbox(&self, agent_id: AgentId) -> Result<Vec<Message>> {
let mut conn = self.conn.clone();
let inbox_key = self.inbox_key(agent_id);
let mut messages = Vec::new();
loop {
let result: Option<String> = conn.lpop(&inbox_key, None).await?;
match result {
Some(data) => {
if let Ok(msg) = serde_json::from_str::<Message>(&data) {
messages.push(msg);
}
}
None => break,
}
}
debug!(
"Drained {} messages from agent {}",
messages.len(),
agent_id
);
Ok(messages)
}
pub async fn move_message_to_inbox(&self, message: &Message, to_agent: AgentId) -> Result<()> {
let mut new_msg = message.clone();
new_msg.to = to_agent;
self.send(&new_msg).await
}
async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
let mut conn = self.conn.clone();
let mut cursor: u64 = 0;
let mut all_keys = Vec::new();
loop {
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
all_keys.extend(keys);
cursor = next_cursor;
if cursor == 0 {
break;
}
}
Ok(all_keys)
}
pub async fn reset_all(&self) -> Result<usize> {
let mut conn = self.conn.clone();
let pattern = self.town_key_pattern();
let keys = self.scan_keys(&pattern).await?;
if keys.is_empty() {
return Ok(0);
}
let count = keys.len();
let _: () = redis::cmd("DEL").arg(&keys).query_async(&mut conn).await?;
debug!(
"Reset: deleted {} keys for town '{}'",
count,
self.town_name()
);
Ok(count)
}
pub async fn reset_agents_only(&self) -> Result<usize> {
let mut conn = self.conn.clone();
let mut keys = Vec::new();
keys.extend(self.scan_keys(&self.agent_key_pattern()).await?);
keys.extend(self.scan_keys(&self.inbox_key_pattern()).await?);
keys.extend(self.scan_keys(&self.urgent_key_pattern()).await?);
keys.extend(self.scan_keys(&self.stop_key_pattern()).await?);
keys.extend(self.scan_keys(&self.activity_key_pattern()).await?);
if keys.is_empty() {
return Ok(0);
}
let count = keys.len();
let _: () = redis::cmd("DEL").arg(&keys).query_async(&mut conn).await?;
debug!(
"Reset agents only: deleted {} keys for town '{}'",
count,
self.town_name()
);
Ok(count)
}
const DOCKET_GROUP: &'static str = "workers";
pub async fn docket_ensure_group(&self) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.docket_tasks_key();
let result: redis::RedisResult<()> = redis::cmd("XGROUP")
.arg("CREATE")
.arg(&key)
.arg(Self::DOCKET_GROUP)
.arg("$")
.arg("MKSTREAM")
.query_async(&mut conn)
.await;
match result {
Ok(()) => {
debug!("Created docket consumer group on {}", key);
Ok(())
}
Err(e) if e.to_string().contains("BUSYGROUP") => {
debug!("Docket consumer group already exists on {}", key);
Ok(())
}
Err(e) => Err(e.into()),
}
}
pub async fn docket_push(
&self,
task_id: crate::task::TaskId,
description: &str,
priority: &str,
from: &str,
to: &str,
) -> Result<String> {
let mut conn = self.conn.clone();
let key = self.docket_tasks_key();
let now = chrono::Utc::now().to_rfc3339();
let entry_id: String = redis::cmd("XADD")
.arg(&key)
.arg("*") .arg("task_id")
.arg(task_id.to_string())
.arg("type")
.arg("task_assign")
.arg("message")
.arg(description)
.arg("priority")
.arg(priority)
.arg("from")
.arg(from)
.arg("to")
.arg(to)
.arg("timestamp")
.arg(&now)
.query_async(&mut conn)
.await?;
debug!("Docket XADD task {} -> entry {}", task_id, entry_id);
Ok(entry_id)
}
pub async fn docket_read(
&self,
consumer_name: &str,
block_ms: usize,
) -> Result<Option<(String, std::collections::HashMap<String, String>)>> {
let mut conn = self.conn.clone();
let key = self.docket_tasks_key();
let result: redis::Value = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(Self::DOCKET_GROUP)
.arg(consumer_name)
.arg("COUNT")
.arg(1)
.arg("BLOCK")
.arg(block_ms)
.arg("STREAMS")
.arg(&key)
.arg(">")
.query_async(&mut conn)
.await?;
Self::parse_xread_single(result)
}
pub async fn docket_ack(&self, entry_id: &str) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.docket_tasks_key();
let _: i64 = redis::cmd("XACK")
.arg(&key)
.arg(Self::DOCKET_GROUP)
.arg(entry_id)
.query_async(&mut conn)
.await?;
debug!("Docket XACK entry {}", entry_id);
Ok(())
}
pub async fn docket_len(&self) -> Result<usize> {
let mut conn = self.conn.clone();
let key = self.docket_tasks_key();
let len: usize = redis::cmd("XLEN").arg(&key).query_async(&mut conn).await?;
Ok(len)
}
pub async fn docket_pending_count(&self) -> Result<usize> {
let mut conn = self.conn.clone();
let key = self.docket_tasks_key();
let result: redis::Value = redis::cmd("XPENDING")
.arg(&key)
.arg(Self::DOCKET_GROUP)
.query_async(&mut conn)
.await?;
match result {
redis::Value::Array(ref items) if !items.is_empty() => match &items[0] {
redis::Value::Int(n) => Ok(*n as usize),
_ => Ok(0),
},
_ => Ok(0),
}
}
pub async fn docket_group_lag(&self) -> Result<Option<usize>> {
let mut conn = self.conn.clone();
let key = self.docket_tasks_key();
let result: redis::Value = redis::cmd("XINFO")
.arg("GROUPS")
.arg(&key)
.query_async(&mut conn)
.await?;
for group in Self::parse_xinfo_groups(result) {
let Some(name) = group.get("name") else {
continue;
};
if name == Self::DOCKET_GROUP {
return Ok(group
.get("lag")
.and_then(|value| value.parse::<usize>().ok()));
}
}
Ok(None)
}
pub async fn docket_log_event(
&self,
task_id: crate::task::TaskId,
event_type: &str,
detail: &str,
) -> Result<String> {
let mut conn = self.conn.clone();
let key = self.docket_events_key();
let now = chrono::Utc::now().to_rfc3339();
let entry_id: String = redis::cmd("XADD")
.arg(&key)
.arg("*")
.arg("task_id")
.arg(task_id.to_string())
.arg("event")
.arg(event_type)
.arg("detail")
.arg(detail)
.arg("timestamp")
.arg(&now)
.query_async(&mut conn)
.await?;
debug!(
"Docket event {} for task {} -> {}",
event_type, task_id, entry_id
);
Ok(entry_id)
}
fn parse_xread_single(
value: redis::Value,
) -> Result<Option<(String, std::collections::HashMap<String, String>)>> {
use redis::Value;
let streams = match value {
Value::Nil => return Ok(None),
Value::Array(s) => s,
_ => return Ok(None),
};
let stream = match streams.into_iter().next() {
Some(Value::Array(s)) => s,
_ => return Ok(None),
};
let entries = match stream.into_iter().nth(1) {
Some(Value::Array(e)) => e,
_ => return Ok(None),
};
let entry = match entries.into_iter().next() {
Some(Value::Array(e)) => e,
_ => return Ok(None),
};
let mut entry_iter = entry.into_iter();
let entry_id = match entry_iter.next() {
Some(Value::BulkString(b)) => String::from_utf8_lossy(&b).to_string(),
_ => return Ok(None),
};
let fields_raw = match entry_iter.next() {
Some(Value::Array(f)) => f,
_ => return Ok(None),
};
let mut fields = std::collections::HashMap::new();
let mut field_iter = fields_raw.into_iter();
while let (Some(k), Some(v)) = (field_iter.next(), field_iter.next()) {
if let (Value::BulkString(kb), Value::BulkString(vb)) = (k, v) {
fields.insert(
String::from_utf8_lossy(&kb).to_string(),
String::from_utf8_lossy(&vb).to_string(),
);
}
}
Ok(Some((entry_id, fields)))
}
fn parse_xinfo_groups(value: redis::Value) -> Vec<std::collections::HashMap<String, String>> {
use redis::Value;
fn value_to_string(value: Value) -> Option<String> {
match value {
Value::BulkString(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()),
Value::SimpleString(text) => Some(text),
Value::Int(number) => Some(number.to_string()),
Value::Nil => None,
_ => None,
}
}
let groups = match value {
Value::Array(groups) => groups,
_ => return Vec::new(),
};
groups
.into_iter()
.filter_map(|group| match group {
Value::Array(fields) => {
let mut parsed = std::collections::HashMap::new();
let mut iter = fields.into_iter();
while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
if let (Some(key), Some(value)) =
(value_to_string(key), value_to_string(value))
{
parsed.insert(key, value);
}
}
Some(parsed)
}
Value::Map(entries) => {
let mut parsed = std::collections::HashMap::new();
for (key, value) in entries {
if let (Some(key), Some(value)) =
(value_to_string(key), value_to_string(value))
{
parsed.insert(key, value);
}
}
Some(parsed)
}
_ => None,
})
.collect()
}
}