use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::clock::Clock;
use crate::ffmpeg::{Ffmpeg, FfmpegError, WebpEncodeSettings};
use crate::fs::{FileStat, Filesystem, FsError};
use crate::http::{Http, HttpRequest, HttpResponse, TransportError};
pub(crate) struct Rule {
url_contains: &'static str,
status: u16,
body: String,
}
impl Rule {
pub(crate) fn new(url_contains: &'static str, status: u16, body: String) -> Self {
Self {
url_contains,
status,
body,
}
}
}
pub(crate) struct MockHttp {
rules: Vec<Rule>,
}
impl MockHttp {
pub(crate) fn new(rules: Vec<Rule>) -> Self {
Self { rules }
}
}
impl Http for MockHttp {
fn send(
&self,
request: HttpRequest,
) -> impl Future<Output = Result<HttpResponse, TransportError>> + Send {
let reply = self
.rules
.iter()
.find(|rule| request.url.contains(rule.url_contains))
.map(|rule| HttpResponse {
status: rule.status,
headers: Vec::new(),
body: rule.body.clone().into_bytes(),
})
.ok_or_else(|| TransportError(format!("no rule matched {}", request.url)));
async move { reply }
}
}
#[derive(Clone)]
pub(crate) struct Reply {
status: u16,
headers: Vec<(String, String)>,
body: Vec<u8>,
}
impl Reply {
pub(crate) fn ok(body: impl Into<Vec<u8>>) -> Self {
Self {
status: 200,
headers: Vec::new(),
body: body.into(),
}
}
pub(crate) fn json(body: &str) -> Self {
Self::ok(body.as_bytes().to_vec())
}
pub(crate) fn status(status: u16) -> Self {
Self {
status,
headers: Vec::new(),
body: Vec::new(),
}
}
pub(crate) fn with_header(mut self, name: &str, value: &str) -> Self {
self.headers.push((name.to_owned(), value.to_owned()));
self
}
pub(crate) fn with_content_length(self, len: u64) -> Self {
self.with_header("content-length", &len.to_string())
}
pub(crate) fn with_retry_after(self, seconds: u64) -> Self {
self.with_header("retry-after", &seconds.to_string())
}
}
struct Route {
url_contains: String,
replies: VecDeque<Reply>,
}
pub(crate) struct ScriptedHttp {
routes: Mutex<Vec<Route>>,
log: Mutex<Vec<String>>,
}
impl ScriptedHttp {
pub(crate) fn new() -> Self {
Self {
routes: Mutex::new(Vec::new()),
log: Mutex::new(Vec::new()),
}
}
pub(crate) fn with_auth(self) -> Self {
let client_body = serde_json::json!({
"response": {
"last_active_session_id": "s",
"sessions": [{"id": "s", "user": {"id": "u", "username": "h"}}]
}
})
.to_string();
self.route("/v1/client/sessions/", Reply::json(r#"{"jwt": "a.b.c"}"#))
.route("/v1/client", Reply::json(&client_body))
}
pub(crate) fn route(self, url_contains: &str, reply: Reply) -> Self {
self.route_seq(url_contains, vec![reply])
}
pub(crate) fn route_seq(self, url_contains: &str, replies: Vec<Reply>) -> Self {
self.routes.lock().unwrap().push(Route {
url_contains: url_contains.to_owned(),
replies: replies.into(),
});
self
}
pub(crate) fn calls(&self) -> Vec<String> {
self.log.lock().unwrap().clone()
}
pub(crate) fn count(&self, needle: &str) -> usize {
self.log
.lock()
.unwrap()
.iter()
.filter(|url| url.contains(needle))
.count()
}
}
impl Http for ScriptedHttp {
fn send(
&self,
request: HttpRequest,
) -> impl Future<Output = Result<HttpResponse, TransportError>> + Send {
self.log.lock().unwrap().push(request.url.clone());
let reply = {
let mut routes = self.routes.lock().unwrap();
routes
.iter_mut()
.find(|route| request.url.contains(&route.url_contains))
.map(|route| {
if route.replies.len() > 1 {
route.replies.pop_front().expect("len checked")
} else {
route.replies.front().expect("route has no replies").clone()
}
})
};
let out = match reply {
Some(reply) => Ok(HttpResponse {
status: reply.status,
headers: reply.headers,
body: reply.body,
}),
None => Err(TransportError(format!("no route matched {}", request.url))),
};
async move { out }
}
}
pub(crate) struct MemFs {
files: Mutex<HashMap<String, Vec<u8>>>,
dirs: Mutex<BTreeSet<String>>,
fail_writes: Mutex<HashSet<String>>,
corrupt_writes: Mutex<HashSet<String>>,
fail_removes: Mutex<HashSet<String>>,
}
impl MemFs {
pub(crate) fn new() -> Self {
Self {
files: Mutex::new(HashMap::new()),
dirs: Mutex::new(BTreeSet::new()),
fail_writes: Mutex::new(HashSet::new()),
corrupt_writes: Mutex::new(HashSet::new()),
fail_removes: Mutex::new(HashSet::new()),
}
}
pub(crate) fn with_file(self, path: &str, bytes: impl Into<Vec<u8>>) -> Self {
self.files
.lock()
.unwrap()
.insert(path.to_owned(), bytes.into());
register_parent_dirs(&mut self.dirs.lock().unwrap(), path);
self
}
pub(crate) fn with_dir(self, path: &str) -> Self {
register_dir_chain(&mut self.dirs.lock().unwrap(), path);
self
}
pub(crate) fn fail_write(self, path: &str) -> Self {
self.fail_writes.lock().unwrap().insert(path.to_owned());
self
}
pub(crate) fn corrupt_write(self, path: &str) -> Self {
self.corrupt_writes.lock().unwrap().insert(path.to_owned());
self
}
pub(crate) fn fail_remove(self, path: &str) -> Self {
self.fail_removes.lock().unwrap().insert(path.to_owned());
self
}
pub(crate) fn read_file(&self, path: &str) -> Option<Vec<u8>> {
self.files.lock().unwrap().get(path).cloned()
}
pub(crate) fn exists(&self, path: &str) -> bool {
self.files.lock().unwrap().contains_key(path)
}
pub(crate) fn paths(&self) -> Vec<String> {
let mut paths: Vec<String> = self.files.lock().unwrap().keys().cloned().collect();
paths.sort();
paths
}
pub(crate) fn has_dir(&self, path: &str) -> bool {
self.dirs.lock().unwrap().contains(path)
}
pub(crate) fn file_count(&self) -> usize {
self.files.lock().unwrap().len()
}
pub(crate) fn arm_fail_write(&self, path: &str) {
self.fail_writes.lock().unwrap().insert(path.to_owned());
}
pub(crate) fn disarm_fail_write(&self, path: &str) {
self.fail_writes.lock().unwrap().remove(path);
}
pub(crate) fn arm_fail_remove(&self, path: &str) {
self.fail_removes.lock().unwrap().insert(path.to_owned());
}
pub(crate) fn disarm_fail_remove(&self, path: &str) {
self.fail_removes.lock().unwrap().remove(path);
}
pub(crate) fn arm_corrupt_write(&self, path: &str) {
self.corrupt_writes.lock().unwrap().insert(path.to_owned());
}
}
impl Filesystem for MemFs {
fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
if self.fail_writes.lock().unwrap().contains(path) {
return Err(FsError::new(format!("simulated write failure: {path}")));
}
let stored = if self.corrupt_writes.lock().unwrap().contains(path) {
vec![0u8; bytes.len() + 1]
} else {
bytes.to_vec()
};
self.files.lock().unwrap().insert(path.to_owned(), stored);
register_parent_dirs(&mut self.dirs.lock().unwrap(), path);
Ok(())
}
fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
let mut files = self.files.lock().unwrap();
match files.remove(from) {
Some(bytes) => {
files.insert(to.to_owned(), bytes);
register_parent_dirs(&mut self.dirs.lock().unwrap(), to);
Ok(())
}
None => Err(FsError::new(format!("rename source missing: {from}"))),
}
}
fn remove(&self, path: &str) -> Result<(), FsError> {
if self.fail_removes.lock().unwrap().contains(path) {
return Err(FsError::new(format!("simulated remove failure: {path}")));
}
self.files.lock().unwrap().remove(path);
Ok(())
}
fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
let file_paths: Vec<String> = self.files.lock().unwrap().keys().cloned().collect();
let mut dirs = self.dirs.lock().unwrap();
loop {
let snapshot: Vec<String> = dirs.iter().cloned().collect();
let victim = snapshot.iter().find(|d| {
strictly_under(d, root)
&& !file_paths.iter().any(|f| strictly_under(f, d))
&& !snapshot.iter().any(|o| strictly_under(o, d))
});
match victim {
Some(d) => {
dirs.remove(d);
}
None => return Ok(()),
}
}
}
fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
self.files
.lock()
.unwrap()
.get(path)
.cloned()
.ok_or_else(|| FsError::new(format!("no such file: {path}")))
}
fn metadata(&self, path: &str) -> Option<FileStat> {
self.files.lock().unwrap().get(path).map(|bytes| FileStat {
exists: true,
size: bytes.len() as u64,
})
}
}
fn register_parent_dirs(dirs: &mut BTreeSet<String>, path: &str) {
let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
for i in 1..parts.len() {
dirs.insert(parts[..i].join("/"));
}
}
fn register_dir_chain(dirs: &mut BTreeSet<String>, path: &str) {
let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
for i in 1..=parts.len() {
dirs.insert(parts[..i].join("/"));
}
}
fn strictly_under(path: &str, base: &str) -> bool {
if base.is_empty() {
!path.is_empty()
} else {
path.len() > base.len() && path.starts_with(base) && path.as_bytes()[base.len()] == b'/'
}
}
pub(crate) struct StubFfmpeg {
output: Vec<u8>,
fail: bool,
}
impl StubFfmpeg {
pub(crate) fn flac() -> Self {
Self {
output: minimal_flac(),
fail: false,
}
}
pub(crate) fn webp() -> Self {
Self {
output: b"RIFF\x00\x00\x00\x00WEBP-canned-anim".to_vec(),
fail: false,
}
}
pub(crate) fn failing() -> Self {
Self {
output: Vec::new(),
fail: true,
}
}
}
impl Ffmpeg for StubFfmpeg {
fn wav_to_flac(
&self,
_wav: &[u8],
) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
let out = if self.fail {
Err(FfmpegError::new("simulated transcode failure"))
} else {
Ok(self.output.clone())
};
async move { out }
}
fn mp4_to_webp(
&self,
_mp4: &[u8],
_settings: WebpEncodeSettings,
) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
let out = if self.fail {
Err(FfmpegError::new("simulated transcode failure"))
} else {
Ok(self.output.clone())
};
async move { out }
}
}
#[derive(Clone)]
pub(crate) struct RecordingClock {
sleeps: Arc<Mutex<Vec<Duration>>>,
}
impl RecordingClock {
pub(crate) fn new() -> Self {
Self {
sleeps: Arc::new(Mutex::new(Vec::new())),
}
}
pub(crate) fn sleeps(&self) -> Vec<Duration> {
self.sleeps.lock().unwrap().clone()
}
}
impl Clock for RecordingClock {
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send {
self.sleeps.lock().unwrap().push(duration);
async {}
}
}
pub(crate) fn minimal_flac() -> Vec<u8> {
let mut streaminfo = vec![0u8; 34];
streaminfo[0..2].copy_from_slice(&4096u16.to_be_bytes());
streaminfo[2..4].copy_from_slice(&4096u16.to_be_bytes());
let sample_rate: u64 = 44_100;
let channels: u64 = 2;
let bits_per_sample: u64 = 16;
let total_samples: u64 = 44_100;
let packed: u64 = (sample_rate << 44)
| ((channels - 1) << 41)
| ((bits_per_sample - 1) << 36)
| total_samples;
streaminfo[10..18].copy_from_slice(&packed.to_be_bytes());
let mut out = Vec::new();
out.extend_from_slice(b"fLaC");
out.push(0x80);
out.extend_from_slice(&[0x00, 0x00, 0x22]);
out.extend_from_slice(&streaminfo);
out.extend_from_slice(b"\xFF\xF8audio-frame-payload");
out
}
#[derive(Clone)]
pub(crate) enum Outcome {
Transport(String),
Reply(Reply),
}
impl Outcome {
pub(crate) fn ok(body: impl Into<Vec<u8>>) -> Self {
Outcome::Reply(Reply::ok(body))
}
pub(crate) fn status(status: u16) -> Self {
Outcome::Reply(Reply::status(status))
}
pub(crate) fn transport(reason: &str) -> Self {
Outcome::Transport(reason.to_owned())
}
pub(crate) fn truncated(body: impl Into<Vec<u8>>, advertised: u64) -> Self {
Outcome::Reply(Reply::ok(body).with_content_length(advertised))
}
}
struct ChaosRoute {
url_contains: String,
program: VecDeque<Outcome>,
}
pub(crate) struct ChaosHttp {
routes: Mutex<Vec<ChaosRoute>>,
log: Mutex<Vec<String>>,
}
impl ChaosHttp {
pub(crate) fn new() -> Self {
Self {
routes: Mutex::new(Vec::new()),
log: Mutex::new(Vec::new()),
}
}
pub(crate) fn with_auth(self) -> Self {
let client_body = serde_json::json!({
"response": {
"last_active_session_id": "s",
"sessions": [{"id": "s", "user": {"id": "u", "username": "h"}}]
}
})
.to_string();
self.serve("/v1/client/sessions/", br#"{"jwt": "a.b.c"}"#.to_vec())
.serve("/v1/client", client_body.into_bytes())
}
pub(crate) fn serve(self, url_contains: &str, body: impl Into<Vec<u8>>) -> Self {
self.program(url_contains, vec![Outcome::ok(body)])
}
pub(crate) fn program(self, url_contains: &str, outcomes: Vec<Outcome>) -> Self {
self.routes.lock().unwrap().push(ChaosRoute {
url_contains: url_contains.to_owned(),
program: outcomes.into(),
});
self
}
pub(crate) fn count(&self, needle: &str) -> usize {
self.log
.lock()
.unwrap()
.iter()
.filter(|url| url.contains(needle))
.count()
}
fn next_outcome(&self, url: &str) -> Outcome {
let mut routes = self.routes.lock().unwrap();
match routes
.iter_mut()
.find(|route| url.contains(&route.url_contains))
{
Some(route) if route.program.len() > 1 => {
route.program.pop_front().expect("len checked")
}
Some(route) => route
.program
.front()
.cloned()
.expect("route has at least one outcome"),
None => Outcome::status(404),
}
}
}
impl Http for ChaosHttp {
fn send(
&self,
request: HttpRequest,
) -> impl Future<Output = Result<HttpResponse, TransportError>> + Send {
self.log.lock().unwrap().push(request.url.clone());
let out = match self.next_outcome(&request.url) {
Outcome::Transport(reason) => Err(TransportError(reason)),
Outcome::Reply(reply) => Ok(HttpResponse {
status: reply.status,
headers: reply.headers,
body: reply.body,
}),
};
async move { out }
}
}