use anyhow::{Context, Result, anyhow, bail};
use regex::Regex;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::{Duration, sleep};
#[derive(Clone, Copy, Debug)]
pub enum RiverUser {
Alice,
Bob,
}
#[derive(Clone, Debug)]
pub struct RetryConfig {
pub max_retries: usize,
pub retry_delay: Duration,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
retry_delay: Duration::from_secs(5),
}
}
}
impl RetryConfig {
pub fn no_retry() -> Self {
Self {
max_retries: 1,
retry_delay: Duration::ZERO,
}
}
}
pub struct RiverSession {
riverctl: PathBuf,
alice_dir: TempDir,
bob_dir: TempDir,
alice_url: String,
bob_url: String,
room_key: String,
invite_regex: Regex,
room_regex: Regex,
retry_config: RetryConfig,
}
impl RiverSession {
pub async fn initialize(riverctl: PathBuf, alice_url: String, bob_url: String) -> Result<Self> {
Self::initialize_with_retry(riverctl, alice_url, bob_url, RetryConfig::default()).await
}
pub async fn initialize_with_retry(
riverctl: PathBuf,
alice_url: String,
bob_url: String,
retry_config: RetryConfig,
) -> Result<Self> {
let alice_dir = TempDir::new().context("failed to create Alice temp config dir")?;
let bob_dir = TempDir::new().context("failed to create Bob temp config dir")?;
let mut session = Self {
riverctl,
alice_dir,
bob_dir,
alice_url,
bob_url,
room_key: String::new(),
invite_regex: Regex::new(r"[A-Za-z0-9+/=]{40,}").unwrap(),
room_regex: Regex::new(r"[A-Za-z0-9]{40,}").unwrap(),
retry_config,
};
session.setup_room().await?;
Ok(session)
}
pub async fn initialize_no_retry(
riverctl: PathBuf,
alice_url: String,
bob_url: String,
) -> Result<Self> {
Self::initialize_with_retry(riverctl, alice_url, bob_url, RetryConfig::no_retry()).await
}
async fn setup_room(&mut self) -> Result<()> {
let create_output = self
.run_riverctl(
RiverUser::Alice,
&[
"room",
"create",
"--name",
"test-room",
"--nickname",
"Alice",
],
)
.await?;
self.room_key = self
.room_regex
.find(&create_output)
.map(|m| m.as_str().to_string())
.ok_or_else(|| anyhow!("failed to parse room owner key from riverctl output"))?;
let invite_output = self
.run_riverctl(
RiverUser::Alice,
&["invite", "create", self.room_key.as_str()],
)
.await?;
let invitation_code = self
.invite_regex
.find_iter(&invite_output)
.filter(|m| m.as_str() != self.room_key)
.last()
.map(|m| m.as_str().to_string())
.ok_or_else(|| anyhow!("failed to parse invitation code from riverctl output"))?;
self.run_riverctl(
RiverUser::Bob,
&["invite", "accept", &invitation_code, "--nickname", "Bob"],
)
.await?;
Ok(())
}
pub async fn send_message(&self, user: RiverUser, body: &str) -> Result<()> {
self.run_riverctl(user, &["message", "send", self.room_key.as_str(), body])
.await
.map(|_| ())
}
pub async fn list_messages(&self, user: RiverUser) -> Result<String> {
self.run_riverctl(user, &["message", "list", self.room_key.as_str()])
.await
}
pub fn room_key(&self) -> &str {
&self.room_key
}
pub async fn run_riverctl(&self, user: RiverUser, args: &[&str]) -> Result<String> {
let (url, config_dir) = match user {
RiverUser::Alice => (&self.alice_url, self.alice_dir.path()),
RiverUser::Bob => (&self.bob_url, self.bob_dir.path()),
};
for attempt in 1..=self.retry_config.max_retries {
let mut cmd = tokio::process::Command::new(&self.riverctl);
cmd.arg("--node-url").arg(url);
cmd.args(args);
cmd.env("RIVER_CONFIG_DIR", config_dir);
let output = cmd
.output()
.await
.context("failed to execute riverctl command")?;
if output.status.success() {
return Ok(String::from_utf8_lossy(&output.stdout).to_string());
}
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let retriable = stderr.contains("Timeout waiting for")
|| stderr.contains("connection refused")
|| stderr.contains("HTTP request failed");
if attempt == self.retry_config.max_retries || !retriable {
bail!("riverctl failed (user {:?}): {}", user, stderr);
}
println!(
"riverctl attempt {}/{} failed for {:?}: {}; retrying in {}s",
attempt,
self.retry_config.max_retries,
user,
stderr.trim(),
self.retry_config.retry_delay.as_secs()
);
sleep(self.retry_config.retry_delay).await;
}
unreachable!("riverctl retry loop should always return or bail")
}
}