use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use redis::Client;
use redis::aio::ConnectionManager;
use tokio::process::Child;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use crate::agent::{Agent, AgentId, AgentState, AgentType};
use crate::channel::Channel;
use crate::config::Config;
use crate::error::{Error, Result};
use crate::events::EventStream;
use crate::global_config::GlobalConfig;
use crate::message::{Message, MessageType};
use crate::task::{Task, TaskId};
pub const TT_DIR: &str = ".tt";
const AGENTS_DIR: &str = ".tt/agents";
const LOGS_DIR: &str = ".tt/logs";
const TASKS_DIR: &str = ".tt/tasks";
const MIN_MANAGED_REDIS_VERSION: (u32, u32) = (8, 0);
const MIN_HEALTHCHECK_REDIS_VERSION: (u32, u32) = (7, 0);
#[derive(Clone)]
pub struct Town {
config: Config,
channel: Channel,
agents: Arc<RwLock<HashMap<AgentId, Agent>>>,
#[expect(dead_code)]
processes: Arc<RwLock<HashMap<AgentId, Child>>>,
}
const REDIS_PID_FILE: &str = ".tt/redis.pid";
static CENTRAL_REDIS_START_LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
fn find_redis_server() -> std::path::PathBuf {
if let Some(home) = dirs::home_dir() {
let tt_redis = home.join(".tt/bin/redis-server");
if tt_redis.exists() {
debug!("Using bootstrapped Redis: {}", tt_redis.display());
return tt_redis;
}
}
std::path::PathBuf::from("redis-server")
}
impl Town {
fn check_managed_redis_version() -> Result<()> {
use std::process::Command as StdCommand;
let redis_bin = find_redis_server();
let output = StdCommand::new(&redis_bin)
.arg("--version")
.output()
.map_err(|_| Error::RedisNotInstalled)?;
if !output.status.success() {
return Err(Error::RedisNotInstalled);
}
let version_str = String::from_utf8_lossy(&output.stdout);
let version = Self::parse_redis_version(&version_str)?;
if version < MIN_MANAGED_REDIS_VERSION {
return Err(Error::RedisVersionTooOld(format!(
"{}.{}",
version.0, version.1
)));
}
info!("Redis version {}.{} detected ✓", version.0, version.1);
Ok(())
}
fn parse_redis_version(version_str: &str) -> Result<(u32, u32)> {
let version_part = version_str
.split("v=")
.nth(1)
.and_then(|s| s.split_whitespace().next())
.ok_or_else(|| Error::RedisVersionTooOld("unknown".to_string()))?;
let parts: Vec<&str> = version_part.split('.').collect();
if parts.len() < 2 {
return Err(Error::RedisVersionTooOld(version_part.to_string()));
}
let major = parts[0]
.parse::<u32>()
.map_err(|_| Error::RedisVersionTooOld(version_part.to_string()))?;
let minor = parts[1]
.parse::<u32>()
.map_err(|_| Error::RedisVersionTooOld(version_part.to_string()))?;
Ok((major, minor))
}
pub async fn init(path: impl AsRef<Path>, name: impl Into<String>) -> Result<Self> {
let path = path.as_ref();
let name = name.into();
let config = Config::new(&name, path);
Self::init_with_config(config).await
}
pub async fn init_with_config(config: Config) -> Result<Self> {
let path = &config.root;
if !config.is_remote_redis() {
Self::check_managed_redis_version()?;
}
info!("Initializing town '{}' at {}", config.name, path.display());
std::fs::create_dir_all(path)?;
std::fs::create_dir_all(path.join(TT_DIR))?;
std::fs::create_dir_all(path.join(AGENTS_DIR))?;
std::fs::create_dir_all(path.join(LOGS_DIR))?;
std::fs::create_dir_all(path.join(TASKS_DIR))?;
config.save()?;
Self::start_redis(&config).await?;
let channel = Self::connect_redis(&config).await?;
Ok(Self {
config,
channel,
agents: Arc::new(RwLock::new(HashMap::new())),
processes: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn connect(path: impl AsRef<Path>) -> Result<Self> {
let config = Config::load(&path)?;
if !config.is_remote_redis() {
Self::check_managed_redis_version()?;
}
let redis_appears_ready = if config.is_remote_redis() {
true
} else if config.redis.use_socket {
config.socket_path().exists()
} else {
std::net::TcpStream::connect(format!("{}:{}", config.redis.bind, config.redis.port))
.is_ok()
};
let channel = if redis_appears_ready {
match Self::connect_redis(&config).await {
Ok(ch) => ch,
Err(_) if !config.is_remote_redis() => {
warn!("Redis not responding, restarting...");
Self::start_redis(&config).await?;
Self::connect_redis(&config).await?
}
Err(e) => {
return Err(e);
}
}
} else {
debug!("Redis not found, starting...");
Self::start_redis(&config).await?;
Self::connect_redis(&config).await?
};
Ok(Self {
config,
channel,
agents: Arc::new(RwLock::new(HashMap::new())),
processes: Arc::new(RwLock::new(HashMap::new())),
})
}
async fn start_redis(config: &Config) -> Result<()> {
if config.is_remote_redis() {
info!("Using external Redis: {}", config.redis_url_redacted());
return Ok(());
}
let is_central = config.is_central_redis();
let _central_guard = if is_central {
Some(
CENTRAL_REDIS_START_LOCK
.get_or_init(|| tokio::sync::Mutex::new(()))
.lock()
.await,
)
} else {
None
};
if is_central && GlobalConfig::is_central_redis_running() {
debug!("Central Redis already running");
Self::wait_for_redis_ready(config).await?;
return Ok(());
}
let (pid_file, work_dir) = if is_central {
let global_dir = GlobalConfig::config_dir()?;
std::fs::create_dir_all(&global_dir)?;
(GlobalConfig::redis_pid_path()?, global_dir)
} else {
(config.root.join(REDIS_PID_FILE), config.root.clone())
};
let redis_bin = find_redis_server();
debug!("Using Redis binary: {}", redis_bin.display());
let mut args: Vec<String> = vec![
"--daemonize".to_string(),
"yes".to_string(),
"--pidfile".to_string(),
pid_file.to_str().unwrap().to_string(),
"--loglevel".to_string(),
"warning".to_string(),
];
if config.redis.use_socket {
let socket_path = config.socket_path();
if socket_path.exists() {
std::fs::remove_file(&socket_path)?;
}
info!("Starting Redis with socket: {}", socket_path.display());
args.extend([
"--unixsocket".to_string(),
socket_path.to_str().unwrap().to_string(),
"--unixsocketperm".to_string(),
"700".to_string(),
"--port".to_string(),
"0".to_string(), ]);
} else {
info!(
"Starting Redis with TCP on {}:{}",
config.redis.bind, config.redis.port
);
if config.redis.tls_enabled {
args.extend([
"--tls-port".to_string(),
config.redis.port.to_string(),
"--port".to_string(),
"0".to_string(), ]);
if let Some(ref cert) = config.redis.tls_cert {
args.extend(["--tls-cert-file".to_string(), cert.clone()]);
}
if let Some(ref key) = config.redis.tls_key {
args.extend(["--tls-key-file".to_string(), key.clone()]);
}
if let Some(ref ca_cert) = config.redis.tls_ca_cert {
args.extend(["--tls-ca-cert-file".to_string(), ca_cert.clone()]);
}
} else {
args.extend(["--port".to_string(), config.redis.port.to_string()]);
}
args.extend(["--bind".to_string(), config.redis.bind.clone()]);
if let Some(ref password) = config.redis_password() {
args.extend(["--requirepass".to_string(), password.clone()]);
}
if config.redis.bind != "127.0.0.1" && config.redis_password().is_none() {
warn!(
"Binding to {} without password - enabling protected mode",
config.redis.bind
);
args.extend(["--protected-mode".to_string(), "yes".to_string()]);
}
}
if is_central {
info!(
"Starting central Redis on {}:{}",
config.redis.host, config.redis.port
);
}
let status = std::process::Command::new(&redis_bin)
.args(&args)
.current_dir(&work_dir)
.status()?;
if !status.success() {
return Err(Error::Timeout("Redis failed to start".into()));
}
Self::wait_for_redis_ready(config).await
}
async fn connect_redis(config: &Config) -> Result<Channel> {
let url = config.redis_url();
debug!("Connecting to Redis: {}", config.redis_url_redacted());
let client = Client::open(url).map_err(|err| {
Error::Config(format!(
"Invalid Redis configuration for {}: {}",
config.redis_url_redacted(),
err
))
})?;
Self::run_redis_startup_health_check(&client, config).await?;
let conn = tokio::time::timeout(
std::time::Duration::from_secs(5),
ConnectionManager::new(client),
)
.await
.map_err(|_| Error::Timeout("Redis connection timed out".into()))??;
Ok(Channel::new(conn, &config.name))
}
async fn run_redis_startup_health_check(client: &Client, config: &Config) -> Result<()> {
let mut conn = tokio::time::timeout(
Duration::from_secs(5),
client.get_multiplexed_async_connection(),
)
.await
.map_err(|_| {
Self::redis_connection_error(
config,
"Timed out while opening the configured Redis connection",
)
})?
.map_err(|err| Self::redis_connection_error(config, &err.to_string()))?;
let response: String = tokio::time::timeout(
Duration::from_secs(2),
redis::cmd("PING").query_async(&mut conn),
)
.await
.map_err(|_| Self::redis_connection_error(config, "Timed out waiting for Redis PING"))?
.map_err(|err| Self::redis_connection_error(config, &err.to_string()))?;
if response != "PONG" {
return Err(Self::redis_connection_error(
config,
&format!("Unexpected PING response: {}", response),
));
}
let info: String = tokio::time::timeout(
Duration::from_secs(2),
redis::cmd("INFO").arg("server").query_async(&mut conn),
)
.await
.map_err(|_| {
Self::redis_connection_error(config, "Timed out while fetching Redis server info")
})?
.map_err(|err| Self::redis_connection_error(config, &err.to_string()))?;
let version = Self::parse_info_redis_version(&info)?;
if version < MIN_HEALTHCHECK_REDIS_VERSION {
return Err(Error::Config(format!(
"Configured Redis at {} is version {}.{}. Tinytown requires Redis {}.{}+ for connectivity.",
config.redis_url_redacted(),
version.0,
version.1,
MIN_HEALTHCHECK_REDIS_VERSION.0,
MIN_HEALTHCHECK_REDIS_VERSION.1
)));
}
Ok(())
}
async fn wait_for_redis_ready(config: &Config) -> Result<()> {
let deadline = Instant::now() + Duration::from_secs(10);
while Instant::now() < deadline {
let endpoint_ready = if config.redis.use_socket {
config.socket_path().exists()
} else {
std::net::TcpStream::connect(format!("{}:{}", config.redis.bind, config.redis.port))
.is_ok()
};
if endpoint_ready && Self::ping_redis(config).await.is_ok() {
debug!("Redis ready");
return Ok(());
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(Error::Timeout("Redis failed to start".into()))
}
async fn ping_redis(config: &Config) -> Result<()> {
let client = Client::open(config.redis_url())?;
let mut conn = tokio::time::timeout(
Duration::from_secs(1),
client.get_multiplexed_async_connection(),
)
.await
.map_err(|_| Error::Timeout("Redis connection timed out".into()))??;
let response: String = tokio::time::timeout(
Duration::from_secs(1),
redis::cmd("PING").query_async(&mut conn),
)
.await
.map_err(|_| Error::Timeout("Redis ping timed out".into()))??;
if response == "PONG" {
Ok(())
} else {
Err(Error::Timeout(format!(
"Unexpected Redis PING response: {}",
response
)))
}
}
fn parse_info_redis_version(info: &str) -> Result<(u32, u32)> {
let version = info
.lines()
.find_map(|line| line.strip_prefix("redis_version:"))
.ok_or_else(|| {
Error::Config("Redis INFO response did not include redis_version".into())
})?;
let parts: Vec<&str> = version.split('.').collect();
if parts.len() < 2 {
return Err(Error::Config(format!(
"Redis INFO reported an invalid redis_version value: {version}"
)));
}
let major = parts[0].parse::<u32>().map_err(|_| {
Error::Config(format!(
"Redis INFO reported an invalid redis_version value: {version}"
))
})?;
let minor = parts[1].parse::<u32>().map_err(|_| {
Error::Config(format!(
"Redis INFO reported an invalid redis_version value: {version}"
))
})?;
Ok((major, minor))
}
fn redis_connection_error(config: &Config, detail: &str) -> Error {
let detail = detail.trim();
let uppercase = detail.to_ascii_uppercase();
let prefix = if uppercase.contains("WRONGPASS") || uppercase.contains("NOAUTH") {
"Redis authentication failed"
} else {
"Failed to connect to configured Redis"
};
Error::Config(format!(
"{prefix} at {}: {detail}",
config.redis_url_redacted()
))
}
pub async fn spawn_agent(&self, name: &str, cli: &str) -> Result<AgentHandle> {
let normalized = name.trim().to_lowercase();
if normalized == "supervisor" || normalized == "conductor" {
return Err(Error::Config(format!(
"'{}' is reserved for the well-known supervisor/conductor mailbox",
name
)));
}
let agent = Agent::new(name, cli, AgentType::Worker);
let id = agent.id;
self.channel.set_agent_state(&agent).await?;
self.agents.write().await.insert(id, agent);
info!("Spawned agent '{}' ({})", name, id);
Ok(AgentHandle {
id,
channel: self.channel.clone(),
})
}
pub async fn agent(&self, name: &str) -> Result<AgentHandle> {
if let Some(agent) = self.channel.get_agent_by_name(name).await? {
return Ok(AgentHandle {
id: agent.id,
channel: self.channel.clone(),
});
}
let normalized = name.trim().to_lowercase();
if normalized == "supervisor" || normalized == "conductor" {
return Ok(AgentHandle {
id: AgentId::supervisor(),
channel: self.channel.clone(),
});
}
Err(Error::AgentNotFound(name.to_string()))
}
pub async fn list_agents(&self) -> Vec<Agent> {
self.channel.list_agents().await.unwrap_or_default()
}
pub fn channel(&self) -> &Channel {
&self.channel
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn root(&self) -> &Path {
&self.config.root
}
pub fn event_stream(&self) -> EventStream {
EventStream::new(self.channel.conn().clone(), self.channel.town_name())
}
}
#[derive(Clone)]
pub struct AgentHandle {
id: AgentId,
channel: Channel,
}
impl AgentHandle {
pub fn id(&self) -> AgentId {
self.id
}
pub async fn assign(&self, task: Task) -> Result<TaskId> {
let task_id = task.id;
self.channel.set_task(&task).await?;
let msg = Message::new(
AgentId::supervisor(),
self.id,
MessageType::TaskAssign {
task_id: task_id.to_string(),
},
);
self.channel.send(&msg).await?;
Ok(task_id)
}
pub async fn send(&self, msg_type: MessageType) -> Result<()> {
let msg = Message::new(AgentId::supervisor(), self.id, msg_type);
self.channel.send(&msg).await
}
pub async fn inbox_len(&self) -> Result<usize> {
self.channel.inbox_len(self.id).await
}
pub async fn state(&self) -> Result<Option<Agent>> {
self.channel.get_agent_state(self.id).await
}
pub async fn wait(&self) -> Result<()> {
loop {
if let Some(agent) = self.state().await? {
match agent.state {
AgentState::Idle | AgentState::Stopped | AgentState::Cold => return Ok(()),
AgentState::Error => {
return Err(Error::AgentNotFound(format!(
"Agent {} in error state",
self.id
)));
}
_ => {}
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
#[cfg(test)]
mod tests {
use super::Town;
use crate::Error;
#[test]
fn parse_info_redis_version_rejects_invalid_values_with_config_error() {
let err = Town::parse_info_redis_version("redis_version:not-a-version")
.expect_err("invalid INFO version should fail");
match err {
Error::Config(message) => {
assert!(message.contains("invalid redis_version value"));
assert!(message.contains("not-a-version"));
assert!(!message.contains("requires Redis 8.0+"));
assert!(!message.contains("tt bootstrap"));
}
other => panic!("expected config error, got {other}"),
}
}
}