use std::future::Future;
use std::marker::PhantomData;
use crate::effect::boxed::BoxFuture;
use crate::effect::trait_def::Effect;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BracketError<E> {
AcquireError(E),
UseError(E),
CleanupError(E),
Both {
use_error: E,
cleanup_error: E,
},
}
impl<E> BracketError<E> {
pub fn acquire_error(&self) -> Option<&E> {
match self {
BracketError::AcquireError(e) => Some(e),
_ => None,
}
}
pub fn use_error(&self) -> Option<&E> {
match self {
BracketError::UseError(e) | BracketError::Both { use_error: e, .. } => Some(e),
_ => None,
}
}
pub fn cleanup_error(&self) -> Option<&E> {
match self {
BracketError::CleanupError(e)
| BracketError::Both {
cleanup_error: e, ..
} => Some(e),
_ => None,
}
}
pub fn map<F, E2>(self, f: F) -> BracketError<E2>
where
F: Fn(E) -> E2,
{
match self {
BracketError::AcquireError(e) => BracketError::AcquireError(f(e)),
BracketError::UseError(e) => BracketError::UseError(f(e)),
BracketError::CleanupError(e) => BracketError::CleanupError(f(e)),
BracketError::Both {
use_error,
cleanup_error,
} => BracketError::Both {
use_error: f(use_error),
cleanup_error: f(cleanup_error),
},
}
}
}
impl<E: std::fmt::Display> std::fmt::Display for BracketError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BracketError::AcquireError(e) => write!(f, "acquire failed: {}", e),
BracketError::UseError(e) => write!(f, "{}", e),
BracketError::CleanupError(e) => write!(f, "cleanup failed: {}", e),
BracketError::Both {
use_error,
cleanup_error,
} => {
write!(
f,
"use failed: {}; cleanup also failed: {}",
use_error, cleanup_error
)
}
}
}
}
impl<E: std::error::Error + 'static> std::error::Error for BracketError<E> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
BracketError::AcquireError(e) => Some(e),
BracketError::UseError(e) => Some(e),
BracketError::Both { use_error, .. } => Some(use_error),
BracketError::CleanupError(e) => Some(e),
}
}
}
pub struct Bracket<Acquire, Use, Release> {
acquire: Acquire,
use_fn: Use,
release: Release,
}
impl<Acquire, Use, Release> std::fmt::Debug for Bracket<Acquire, Use, Release> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Bracket")
.field("acquire", &"<effect>")
.field("use_fn", &"<function>")
.field("release", &"<function>")
.finish()
}
}
impl<Acquire, Use, Release> Bracket<Acquire, Use, Release> {
pub fn new(acquire: Acquire, use_fn: Use, release: Release) -> Self {
Bracket {
acquire,
use_fn,
release,
}
}
}
impl<Acquire, Use, Release, UseEffect, R, T, E, Env, RelFut> Effect
for Bracket<Acquire, Use, Release>
where
Acquire: Effect<Output = R, Error = E, Env = Env>,
Use: FnOnce(&R) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Release: FnOnce(R) -> RelFut + Send,
RelFut: Future<Output = Result<(), E>> + Send,
R: Send,
T: Send,
E: Send + std::fmt::Debug,
Env: Clone + Send + Sync,
{
type Output = T;
type Error = E;
type Env = Env;
async fn run(self, env: &Self::Env) -> Result<T, E> {
let resource = self.acquire.run(env).await?;
let result = (self.use_fn)(&resource).run(env).await;
let release_result = (self.release)(resource).await;
if let Err(ref rel_err) = release_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
result
}
}
pub fn bracket<Acquire, Use, Release, UseEffect, R, T, E, Env, RelFut>(
acquire: Acquire,
release: Release,
use_fn: Use,
) -> Bracket<Acquire, Use, Release>
where
Acquire: Effect<Output = R, Error = E, Env = Env>,
Use: FnOnce(&R) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Release: FnOnce(R) -> RelFut + Send,
RelFut: Future<Output = Result<(), E>> + Send,
R: Send,
T: Send,
E: Send + std::fmt::Debug,
Env: Clone + Send + Sync,
{
Bracket::new(acquire, use_fn, release)
}
pub struct BracketFull<Acquire, Use, Release> {
acquire: Acquire,
use_fn: Use,
release: Release,
}
impl<Acquire, Use, Release> std::fmt::Debug for BracketFull<Acquire, Use, Release> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BracketFull")
.field("acquire", &"<effect>")
.field("use_fn", &"<function>")
.field("release", &"<function>")
.finish()
}
}
impl<Acquire, Use, Release> BracketFull<Acquire, Use, Release> {
pub fn new(acquire: Acquire, use_fn: Use, release: Release) -> Self {
BracketFull {
acquire,
use_fn,
release,
}
}
}
impl<Acquire, Use, Release, UseEffect, R, T, E, Env, RelFut> Effect
for BracketFull<Acquire, Use, Release>
where
Acquire: Effect<Output = R, Error = E, Env = Env>,
Use: FnOnce(&R) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Release: FnOnce(R) -> RelFut + Send,
RelFut: Future<Output = Result<(), E>> + Send,
R: Send,
T: Send,
E: Send,
Env: Clone + Send + Sync,
{
type Output = T;
type Error = BracketError<E>;
type Env = Env;
async fn run(self, env: &Self::Env) -> Result<T, BracketError<E>> {
let resource = match self.acquire.run(env).await {
Ok(r) => r,
Err(e) => return Err(BracketError::AcquireError(e)),
};
let use_result = (self.use_fn)(&resource).run(env).await;
let release_result = (self.release)(resource).await;
match (use_result, release_result) {
(Ok(value), Ok(())) => Ok(value),
(Ok(_), Err(cleanup_err)) => Err(BracketError::CleanupError(cleanup_err)),
(Err(use_err), Ok(())) => Err(BracketError::UseError(use_err)),
(Err(use_err), Err(cleanup_err)) => Err(BracketError::Both {
use_error: use_err,
cleanup_error: cleanup_err,
}),
}
}
}
pub fn bracket_full<Acquire, Use, Release, UseEffect, R, T, E, Env, RelFut>(
acquire: Acquire,
release: Release,
use_fn: Use,
) -> BracketFull<Acquire, Use, Release>
where
Acquire: Effect<Output = R, Error = E, Env = Env>,
Use: FnOnce(&R) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Release: FnOnce(R) -> RelFut + Send,
RelFut: Future<Output = Result<(), E>> + Send,
R: Send,
T: Send,
E: Send,
Env: Clone + Send + Sync,
{
BracketFull::new(acquire, use_fn, release)
}
pub struct BracketSync<Acquire, Use, Release> {
acquire: Acquire,
use_fn: Use,
release: Release,
}
impl<Acquire, Use, Release> std::fmt::Debug for BracketSync<Acquire, Use, Release> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BracketSync")
.field("acquire", &"<effect>")
.field("use_fn", &"<function>")
.field("release", &"<function>")
.finish()
}
}
impl<Acquire, Use, Release> BracketSync<Acquire, Use, Release> {
pub fn new(acquire: Acquire, use_fn: Use, release: Release) -> Self {
BracketSync {
acquire,
use_fn,
release,
}
}
}
impl<Acquire, Use, Release, UseEffect, R, T, E, Env> Effect for BracketSync<Acquire, Use, Release>
where
Acquire: Effect<Output = R, Error = E, Env = Env>,
Use: FnOnce(&R) -> UseEffect + Send + std::panic::UnwindSafe,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Release: FnOnce(R) -> Result<(), E> + Send,
R: Send + std::panic::UnwindSafe,
T: Send,
E: Send + std::fmt::Debug,
Env: Clone + Send + Sync + std::panic::RefUnwindSafe,
{
type Output = T;
type Error = E;
type Env = Env;
async fn run(self, env: &Self::Env) -> Result<T, E> {
let resource = self.acquire.run(env).await?;
let use_result = {
let resource_ref = &resource;
let env_for_use = env;
let use_fn = self.use_fn;
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
futures::executor::block_on(use_fn(resource_ref).run(env_for_use))
}))
};
let release_result = (self.release)(resource);
match use_result {
Ok(Ok(value)) => {
if let Err(ref rel_err) = release_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
Ok(value)
}
Ok(Err(use_err)) => {
if let Err(ref rel_err) = release_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
Err(use_err)
}
Err(panic_payload) => {
if let Err(ref rel_err) = release_result {
#[cfg(feature = "tracing")]
tracing::error!("Resource cleanup failed after panic: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed after panic: {:?}", rel_err);
}
std::panic::resume_unwind(panic_payload)
}
}
}
}
pub fn bracket_sync<Acquire, Use, Release, UseEffect, R, T, E, Env>(
acquire: Acquire,
release: Release,
use_fn: Use,
) -> BracketSync<Acquire, Use, Release>
where
Acquire: Effect<Output = R, Error = E, Env = Env>,
Use: FnOnce(&R) -> UseEffect + Send + std::panic::UnwindSafe,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Release: FnOnce(R) -> Result<(), E> + Send,
R: Send + std::panic::UnwindSafe,
T: Send,
E: Send + std::fmt::Debug,
Env: Clone + Send + Sync + std::panic::RefUnwindSafe,
{
BracketSync::new(acquire, use_fn, release)
}
pub struct Bracket2<Acq1, Acq2, Use, Rel1, Rel2> {
acquire1: Acq1,
acquire2: Acq2,
use_fn: Use,
release1: Rel1,
release2: Rel2,
}
impl<Acq1, Acq2, Use, Rel1, Rel2> std::fmt::Debug for Bracket2<Acq1, Acq2, Use, Rel1, Rel2> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Bracket2")
.field("acquire1", &"<effect>")
.field("acquire2", &"<effect>")
.field("use_fn", &"<function>")
.field("release1", &"<function>")
.field("release2", &"<function>")
.finish()
}
}
impl<Acq1, Acq2, Use, Rel1, Rel2, UseEffect, R1, R2, T, E, Env, RelFut1, RelFut2> Effect
for Bracket2<Acq1, Acq2, Use, Rel1, Rel2>
where
Acq1: Effect<Output = R1, Error = E, Env = Env>,
Acq2: Effect<Output = R2, Error = E, Env = Env>,
Use: FnOnce(&R1, &R2) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Rel1: FnOnce(R1) -> RelFut1 + Send,
RelFut1: Future<Output = Result<(), E>> + Send,
Rel2: FnOnce(R2) -> RelFut2 + Send,
RelFut2: Future<Output = Result<(), E>> + Send,
R1: Send,
R2: Send,
T: Send,
E: Send + std::fmt::Debug,
Env: Clone + Send + Sync,
{
type Output = T;
type Error = E;
type Env = Env;
async fn run(self, env: &Self::Env) -> Result<T, E> {
let r1 = self.acquire1.run(env).await?;
let r2 = match self.acquire2.run(env).await {
Ok(r) => r,
Err(e) => {
let release_result = (self.release1)(r1).await;
if let Err(ref rel_err) = release_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
return Err(e);
}
};
let result = (self.use_fn)(&r1, &r2).run(env).await;
let rel2_result = (self.release2)(r2).await;
if let Err(ref rel_err) = rel2_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
let rel1_result = (self.release1)(r1).await;
if let Err(ref rel_err) = rel1_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
result
}
}
pub fn bracket2<Acq1, Acq2, Use, Rel1, Rel2, UseEffect, R1, R2, T, E, Env, RelFut1, RelFut2>(
acquire1: Acq1,
acquire2: Acq2,
release1: Rel1,
release2: Rel2,
use_fn: Use,
) -> Bracket2<Acq1, Acq2, Use, Rel1, Rel2>
where
Acq1: Effect<Output = R1, Error = E, Env = Env>,
Acq2: Effect<Output = R2, Error = E, Env = Env>,
Use: FnOnce(&R1, &R2) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Rel1: FnOnce(R1) -> RelFut1 + Send,
RelFut1: Future<Output = Result<(), E>> + Send,
Rel2: FnOnce(R2) -> RelFut2 + Send,
RelFut2: Future<Output = Result<(), E>> + Send,
R1: Send,
R2: Send,
T: Send,
E: Send + std::fmt::Debug,
Env: Clone + Send + Sync,
{
Bracket2 {
acquire1,
acquire2,
use_fn,
release1,
release2,
}
}
pub struct Bracket3<Acq1, Acq2, Acq3, Use, Rel1, Rel2, Rel3> {
acquire1: Acq1,
acquire2: Acq2,
acquire3: Acq3,
use_fn: Use,
release1: Rel1,
release2: Rel2,
release3: Rel3,
}
impl<Acq1, Acq2, Acq3, Use, Rel1, Rel2, Rel3> std::fmt::Debug
for Bracket3<Acq1, Acq2, Acq3, Use, Rel1, Rel2, Rel3>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Bracket3")
.field("acquire1", &"<effect>")
.field("acquire2", &"<effect>")
.field("acquire3", &"<effect>")
.field("use_fn", &"<function>")
.field("release1", &"<function>")
.field("release2", &"<function>")
.field("release3", &"<function>")
.finish()
}
}
impl<
Acq1,
Acq2,
Acq3,
Use,
Rel1,
Rel2,
Rel3,
UseEffect,
R1,
R2,
R3,
T,
E,
Env,
RelFut1,
RelFut2,
RelFut3,
> Effect for Bracket3<Acq1, Acq2, Acq3, Use, Rel1, Rel2, Rel3>
where
Acq1: Effect<Output = R1, Error = E, Env = Env>,
Acq2: Effect<Output = R2, Error = E, Env = Env>,
Acq3: Effect<Output = R3, Error = E, Env = Env>,
Use: FnOnce(&R1, &R2, &R3) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Rel1: FnOnce(R1) -> RelFut1 + Send,
RelFut1: Future<Output = Result<(), E>> + Send,
Rel2: FnOnce(R2) -> RelFut2 + Send,
RelFut2: Future<Output = Result<(), E>> + Send,
Rel3: FnOnce(R3) -> RelFut3 + Send,
RelFut3: Future<Output = Result<(), E>> + Send,
R1: Send,
R2: Send,
R3: Send,
T: Send,
E: Send + std::fmt::Debug,
Env: Clone + Send + Sync,
{
type Output = T;
type Error = E;
type Env = Env;
async fn run(self, env: &Self::Env) -> Result<T, E> {
let r1 = self.acquire1.run(env).await?;
let r2 = match self.acquire2.run(env).await {
Ok(r) => r,
Err(e) => {
let _ = (self.release1)(r1).await;
return Err(e);
}
};
let r3 = match self.acquire3.run(env).await {
Ok(r) => r,
Err(e) => {
let _ = (self.release2)(r2).await;
let _ = (self.release1)(r1).await;
return Err(e);
}
};
let result = (self.use_fn)(&r1, &r2, &r3).run(env).await;
let rel3_result = (self.release3)(r3).await;
if let Err(ref rel_err) = rel3_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
let rel2_result = (self.release2)(r2).await;
if let Err(ref rel_err) = rel2_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
let rel1_result = (self.release1)(r1).await;
if let Err(ref rel_err) = rel1_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
result
}
}
pub fn bracket3<
Acq1,
Acq2,
Acq3,
Use,
Rel1,
Rel2,
Rel3,
UseEffect,
R1,
R2,
R3,
T,
E,
Env,
RelFut1,
RelFut2,
RelFut3,
>(
acquire1: Acq1,
acquire2: Acq2,
acquire3: Acq3,
release1: Rel1,
release2: Rel2,
release3: Rel3,
use_fn: Use,
) -> Bracket3<Acq1, Acq2, Acq3, Use, Rel1, Rel2, Rel3>
where
Acq1: Effect<Output = R1, Error = E, Env = Env>,
Acq2: Effect<Output = R2, Error = E, Env = Env>,
Acq3: Effect<Output = R3, Error = E, Env = Env>,
Use: FnOnce(&R1, &R2, &R3) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
Rel1: FnOnce(R1) -> RelFut1 + Send,
RelFut1: Future<Output = Result<(), E>> + Send,
Rel2: FnOnce(R2) -> RelFut2 + Send,
RelFut2: Future<Output = Result<(), E>> + Send,
Rel3: FnOnce(R3) -> RelFut3 + Send,
RelFut3: Future<Output = Result<(), E>> + Send,
R1: Send,
R2: Send,
R3: Send,
T: Send,
E: Send + std::fmt::Debug,
Env: Clone + Send + Sync,
{
Bracket3 {
acquire1,
acquire2,
acquire3,
use_fn,
release1,
release2,
release3,
}
}
pub struct Resource<T, E, Env>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
#[allow(clippy::type_complexity)]
acquire: Box<dyn FnOnce(&Env) -> BoxFuture<'static, Result<T, E>> + Send>,
release: Box<dyn FnOnce(T) -> BoxFuture<'static, Result<(), E>> + Send>,
}
impl<T, E, Env> std::fmt::Debug for Resource<T, E, Env>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Resource")
.field("acquire", &"<effect>")
.field("release", &"<function>")
.finish()
}
}
impl<T, E, Env> Resource<T, E, Env>
where
T: Send + 'static,
E: Send + std::fmt::Debug + 'static,
Env: Clone + Send + Sync + 'static,
{
pub fn new<Acq, Rel, RelFut>(acquire: Acq, release: Rel) -> Self
where
Acq: Effect<Output = T, Error = E, Env = Env> + 'static,
Rel: FnOnce(T) -> RelFut + Send + 'static,
RelFut: Future<Output = Result<(), E>> + Send + 'static,
{
Resource {
acquire: Box::new(move |env: &Env| {
let env = env.clone();
Box::pin(async move { acquire.run(&env).await })
}),
release: Box::new(move |t| Box::pin(release(t))),
}
}
pub fn with<U, F, UseEffect>(self, f: F) -> ResourceWith<T, U, E, Env, F>
where
U: Send + 'static,
F: FnOnce(&T) -> UseEffect + Send + 'static,
UseEffect: Effect<Output = U, Error = E, Env = Env>,
{
ResourceWith {
resource: self,
use_fn: f,
_marker: PhantomData,
}
}
pub fn both<T2>(self, other: Resource<T2, E, Env>) -> Resource<(T, T2), E, Env>
where
T2: Send + 'static,
{
use std::sync::Arc;
let acquire1 = self.acquire;
let release1 = Arc::new(std::sync::Mutex::new(Some(self.release)));
let acquire2 = other.acquire;
let release2 = Arc::new(std::sync::Mutex::new(Some(other.release)));
let release1_for_acquire = release1.clone();
let release1_for_release = release1;
let release2_for_release = release2;
Resource {
acquire: Box::new(move |env: &Env| {
let env = env.clone();
Box::pin(async move {
let t1 = acquire1(&env).await?;
match acquire2(&env).await {
Ok(t2) => Ok((t1, t2)),
Err(acquire_err) => {
let release1 = release1_for_acquire
.lock()
.unwrap()
.take()
.expect("release1 already taken");
if let Err(cleanup_err) = release1(t1).await {
#[cfg(feature = "tracing")]
tracing::warn!(
"Cleanup failed during partial acquisition rollback: {:?}",
cleanup_err
);
#[cfg(not(feature = "tracing"))]
eprintln!(
"Cleanup failed during partial acquisition rollback: {:?}",
cleanup_err
);
}
Err(acquire_err)
}
}
})
}),
release: Box::new(move |(t1, t2): (T, T2)| {
Box::pin(async move {
let release2 = release2_for_release
.lock()
.unwrap()
.take()
.expect("release2 already taken");
let release1 = release1_for_release
.lock()
.unwrap()
.take()
.expect("release1 already taken");
let r2 = release2(t2).await;
let r1 = release1(t1).await;
r2?;
r1?;
Ok(())
})
}),
}
}
}
pub struct ResourceWith<T, U, E, Env, F>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
resource: Resource<T, E, Env>,
use_fn: F,
_marker: PhantomData<U>,
}
impl<T, U, E, Env, F> std::fmt::Debug for ResourceWith<T, U, E, Env, F>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResourceWith")
.field("resource", &"<resource>")
.field("use_fn", &"<function>")
.finish()
}
}
impl<T, U, E, Env, F, UseEffect> Effect for ResourceWith<T, U, E, Env, F>
where
T: Send + 'static,
U: Send + 'static,
E: Send + std::fmt::Debug + 'static,
Env: Clone + Send + Sync + 'static,
F: FnOnce(&T) -> UseEffect + Send + 'static,
UseEffect: Effect<Output = U, Error = E, Env = Env>,
{
type Output = U;
type Error = E;
type Env = Env;
async fn run(self, env: &Self::Env) -> Result<U, E> {
let resource = (self.resource.acquire)(env).await?;
let result = (self.use_fn)(&resource).run(env).await;
let release_result = (self.resource.release)(resource).await;
if let Err(ref rel_err) = release_result {
#[cfg(feature = "tracing")]
tracing::warn!("Resource cleanup failed: {:?}", rel_err);
#[cfg(not(feature = "tracing"))]
eprintln!("Resource cleanup failed: {:?}", rel_err);
}
result
}
}
pub struct Acquiring<T, E, Env>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
resource: Resource<T, E, Env>,
}
impl<T, E, Env> std::fmt::Debug for Acquiring<T, E, Env>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Acquiring")
.field("resource", &"<resource>")
.finish()
}
}
pub fn acquiring<R, E, Env, Acq, Rel, RelFut>(acquire: Acq, release: Rel) -> Acquiring<R, E, Env>
where
R: Send + 'static,
E: Send + std::fmt::Debug + 'static,
Env: Clone + Send + Sync + 'static,
Acq: Effect<Output = R, Error = E, Env = Env> + 'static,
Rel: FnOnce(R) -> RelFut + Send + 'static,
RelFut: Future<Output = Result<(), E>> + Send + 'static,
{
Acquiring {
resource: Resource::new(acquire, release),
}
}
impl<T, E, Env> Acquiring<T, E, Env>
where
T: Send + 'static,
E: Send + std::fmt::Debug + 'static,
Env: Clone + Send + Sync + 'static,
{
pub fn and<T2, Acq, Rel, RelFut>(self, acquire: Acq, release: Rel) -> Acquiring<(T, T2), E, Env>
where
T2: Send + 'static,
Acq: Effect<Output = T2, Error = E, Env = Env> + 'static,
Rel: FnOnce(T2) -> RelFut + Send + 'static,
RelFut: Future<Output = Result<(), E>> + Send + 'static,
{
Acquiring {
resource: self.resource.both(Resource::new(acquire, release)),
}
}
pub fn with<U, F, UseEffect>(self, f: F) -> ResourceWith<T, U, E, Env, F>
where
U: Send + 'static,
F: FnOnce(&T) -> UseEffect + Send + 'static,
UseEffect: Effect<Output = U, Error = E, Env = Env>,
{
self.resource.with(f)
}
}
impl<A, B, E, Env> Acquiring<(A, B), E, Env>
where
A: Send + 'static,
B: Send + 'static,
E: Send + std::fmt::Debug + 'static,
Env: Clone + Send + Sync + 'static,
{
#[allow(clippy::type_complexity)]
pub fn with_flat2<U, F, UseEffect>(
self,
f: F,
) -> ResourceWith<(A, B), U, E, Env, impl FnOnce(&(A, B)) -> UseEffect + Send + 'static>
where
U: Send + 'static,
F: FnOnce(&A, &B) -> UseEffect + Send + 'static,
UseEffect: Effect<Output = U, Error = E, Env = Env>,
{
self.resource.with(move |(a, b)| f(a, b))
}
}
impl<A, B, C, E, Env> Acquiring<((A, B), C), E, Env>
where
A: Send + 'static,
B: Send + 'static,
C: Send + 'static,
E: Send + std::fmt::Debug + 'static,
Env: Clone + Send + Sync + 'static,
{
#[allow(clippy::type_complexity)]
pub fn with_flat3<U, F, UseEffect>(
self,
f: F,
) -> ResourceWith<((A, B), C), U, E, Env, impl FnOnce(&((A, B), C)) -> UseEffect + Send + 'static>
where
U: Send + 'static,
F: FnOnce(&A, &B, &C) -> UseEffect + Send + 'static,
UseEffect: Effect<Output = U, Error = E, Env = Env>,
{
self.resource.with(move |((a, b), c)| f(a, b, c))
}
}
impl<A, B, C, D, E, Env> Acquiring<(((A, B), C), D), E, Env>
where
A: Send + 'static,
B: Send + 'static,
C: Send + 'static,
D: Send + 'static,
E: Send + std::fmt::Debug + 'static,
Env: Clone + Send + Sync + 'static,
{
#[allow(clippy::type_complexity)]
pub fn with_flat4<U, F, UseEffect>(
self,
f: F,
) -> ResourceWith<
(((A, B), C), D),
U,
E,
Env,
impl FnOnce(&(((A, B), C), D)) -> UseEffect + Send + 'static,
>
where
U: Send + 'static,
F: FnOnce(&A, &B, &C, &D) -> UseEffect + Send + 'static,
UseEffect: Effect<Output = U, Error = E, Env = Env>,
{
self.resource.with(move |(((a, b), c), d)| f(a, b, c, d))
}
}
#[deprecated(
since = "0.12.0",
note = "Use `bracket` with async release function instead"
)]
pub fn bracket_simple<Acquire, Use, ReleaseFn, UseEffect, R, T, E, Env>(
acquire: Acquire,
use_fn: Use,
release_fn: ReleaseFn,
) -> impl Effect<Output = T, Error = E, Env = Env>
where
Acquire: Effect<Output = R, Error = E, Env = Env>,
Use: FnOnce(R) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
ReleaseFn: FnOnce(R) + Send,
R: Clone + Send,
T: Send,
E: Send,
Env: Clone + Send + Sync,
{
BracketSimple {
acquire,
use_fn,
release_fn,
}
}
struct BracketSimple<Acquire, Use, ReleaseFn> {
acquire: Acquire,
use_fn: Use,
release_fn: ReleaseFn,
}
impl<Acquire, Use, ReleaseFn, UseEffect, R, T, E, Env> Effect
for BracketSimple<Acquire, Use, ReleaseFn>
where
Acquire: Effect<Output = R, Error = E, Env = Env>,
Use: FnOnce(R) -> UseEffect + Send,
UseEffect: Effect<Output = T, Error = E, Env = Env>,
ReleaseFn: FnOnce(R) + Send,
R: Clone + Send,
T: Send,
E: Send,
Env: Clone + Send + Sync,
{
type Output = T;
type Error = E;
type Env = Env;
async fn run(self, env: &Self::Env) -> Result<T, E> {
let resource = self.acquire.run(env).await?;
let result = (self.use_fn)(resource.clone()).run(env).await;
(self.release_fn)(resource);
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::effect::constructors::{fail, pure};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[tokio::test]
async fn bracket_returns_error_on_acquire_failure() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = bracket(
fail::<i32, String, ()>("acquire failed".to_string()),
move |_: i32| {
released_clone.store(true, Ordering::SeqCst);
async { Ok(()) }
},
|val: &i32| pure::<_, String, ()>(*val * 2),
)
.run(&())
.await;
assert_eq!(result, Err("acquire failed".to_string()));
assert!(
!released.load(Ordering::SeqCst),
"cleanup must NOT run when acquire fails"
);
}
#[tokio::test]
async fn bracket_releases_on_success() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = bracket(
pure::<_, String, ()>(42),
move |_: i32| {
released_clone.store(true, Ordering::SeqCst);
async { Ok(()) }
},
|val: &i32| pure::<_, String, ()>(*val * 2),
)
.run(&())
.await;
assert_eq!(result, Ok(84));
assert!(released.load(Ordering::SeqCst));
}
#[tokio::test]
async fn bracket_releases_on_use_failure() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = bracket(
pure::<_, String, ()>(42),
move |_: i32| {
released_clone.store(true, Ordering::SeqCst);
async { Ok(()) }
},
|_: &i32| fail::<i32, String, ()>("use failed".to_string()),
)
.run(&())
.await;
assert_eq!(result, Err("use failed".to_string()));
assert!(
released.load(Ordering::SeqCst),
"cleanup must run on failure"
);
}
#[tokio::test]
async fn bracket_logs_cleanup_error_returns_use_result() {
let result = bracket(
pure::<_, String, ()>(42),
|_: i32| async { Err::<(), String>("cleanup failed".to_string()) },
|val: &i32| pure::<_, String, ()>(*val * 2),
)
.run(&())
.await;
assert_eq!(
result,
Ok(84),
"use result returned despite cleanup failure"
);
}
#[tokio::test]
async fn bracket2_releases_in_lifo_order() {
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let order1 = order.clone();
let order2 = order.clone();
let result = bracket2(
pure::<_, String, ()>("first"),
pure::<_, String, ()>("second"),
move |_: &str| {
order1.lock().unwrap().push("release_first");
async { Ok(()) }
},
move |_: &str| {
order2.lock().unwrap().push("release_second");
async { Ok(()) }
},
|_: &&str, _: &&str| pure::<_, String, ()>("done"),
)
.run(&())
.await;
assert_eq!(result, Ok("done"));
let releases = order.lock().unwrap();
assert_eq!(*releases, vec!["release_second", "release_first"]);
}
#[tokio::test]
async fn bracket2_releases_first_if_second_acquire_fails() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = bracket2(
pure::<_, String, ()>("first"),
fail::<&str, String, ()>("acquire2 failed".to_string()),
move |_: &str| {
released_clone.store(true, Ordering::SeqCst);
async { Ok(()) }
},
|_: &str| async { Ok(()) },
|_: &&str, _: &&str| pure::<_, String, ()>("done"),
)
.run(&())
.await;
assert!(result.is_err());
assert!(
released.load(Ordering::SeqCst),
"first resource must be released when second acquire fails"
);
}
#[tokio::test]
async fn bracket_full_returns_both_errors() {
let result = bracket_full(
pure::<_, String, ()>(42),
|_: i32| async { Err::<(), String>("cleanup failed".to_string()) },
|_: &i32| fail::<i32, String, ()>("use failed".to_string()),
)
.run(&())
.await;
let err = result.unwrap_err();
match err {
BracketError::Both {
use_error,
cleanup_error,
} => {
assert_eq!(use_error, "use failed");
assert_eq!(cleanup_error, "cleanup failed");
}
_ => panic!("expected BracketError::Both"),
}
}
#[tokio::test]
async fn bracket_full_returns_use_error_only() {
let result = bracket_full(
pure::<_, String, ()>(42),
|_: i32| async { Ok(()) },
|_: &i32| fail::<i32, String, ()>("use failed".to_string()),
)
.run(&())
.await;
let err = result.unwrap_err();
match err {
BracketError::UseError(e) => assert_eq!(e, "use failed"),
_ => panic!("expected BracketError::UseError"),
}
}
#[tokio::test]
async fn bracket_full_returns_cleanup_error_only() {
let result = bracket_full(
pure::<_, String, ()>(42),
|_: i32| async { Err::<(), String>("cleanup failed".to_string()) },
|_: &i32| pure::<i32, String, ()>(84),
)
.run(&())
.await;
let err = result.unwrap_err();
match err {
BracketError::CleanupError(e) => assert_eq!(e, "cleanup failed"),
_ => panic!("expected BracketError::CleanupError"),
}
}
#[tokio::test]
async fn bracket_full_returns_acquire_error() {
let result = bracket_full(
fail::<i32, String, ()>("acquire failed".to_string()),
|_: i32| async { Ok(()) },
|_: &i32| pure::<i32, String, ()>(42),
)
.run(&())
.await;
let err = result.unwrap_err();
match err {
BracketError::AcquireError(e) => assert_eq!(e, "acquire failed"),
_ => panic!("expected BracketError::AcquireError"),
}
}
#[tokio::test]
async fn resource_use_guarantees_cleanup() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let resource = Resource::new(pure::<_, String, ()>(42), move |_: i32| {
released_clone.store(true, Ordering::SeqCst);
async { Ok(()) }
});
let result = resource
.with(|val: &i32| pure::<_, String, ()>(*val * 2))
.run(&())
.await;
assert_eq!(result, Ok(84));
assert!(released.load(Ordering::SeqCst));
}
#[tokio::test]
async fn acquiring_builder_single_resource() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = acquiring(pure::<_, String, ()>(42), move |_: i32| {
released_clone.store(true, Ordering::SeqCst);
async { Ok(()) }
})
.with(|val: &i32| pure::<_, String, ()>(*val * 2))
.run(&())
.await;
assert_eq!(result, Ok(84));
assert!(released.load(Ordering::SeqCst));
}
#[tokio::test]
async fn acquiring_builder_multiple_resources() {
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let order1 = order.clone();
let order2 = order.clone();
let order3 = order.clone();
let result = acquiring(pure::<_, String, ()>("first"), move |_: &str| {
order1.lock().unwrap().push("release_first");
async { Ok(()) }
})
.and(pure::<_, String, ()>("second"), move |_: &str| {
order2.lock().unwrap().push("release_second");
async { Ok(()) }
})
.and(pure::<_, String, ()>("third"), move |_: &str| {
order3.lock().unwrap().push("release_third");
async { Ok(()) }
})
.with(|((first, second), third): &((&str, &str), &str)| {
assert_eq!(*first, "first");
assert_eq!(*second, "second");
assert_eq!(*third, "third");
pure::<_, String, ()>("done")
})
.run(&())
.await;
assert_eq!(result, Ok("done"));
let releases = order.lock().unwrap();
assert_eq!(
*releases,
vec!["release_third", "release_second", "release_first"]
);
}
#[tokio::test]
async fn acquiring_builder_with_flat2_two_resources() {
let result = acquiring(pure::<_, String, ()>(10), |_: i32| async { Ok(()) })
.and(pure::<_, String, ()>(20), |_: i32| async { Ok(()) })
.with_flat2(|a: &i32, b: &i32| pure::<_, String, ()>(*a + *b))
.run(&())
.await;
assert_eq!(result, Ok(30));
}
#[tokio::test]
async fn acquiring_builder_with_flat3_three_resources() {
let result = acquiring(pure::<_, String, ()>(1), |_: i32| async { Ok(()) })
.and(pure::<_, String, ()>(2), |_: i32| async { Ok(()) })
.and(pure::<_, String, ()>(3), |_: i32| async { Ok(()) })
.with_flat3(|a: &i32, b: &i32, c: &i32| pure::<_, String, ()>(a + b + c))
.run(&())
.await;
assert_eq!(result, Ok(6));
}
#[tokio::test]
async fn acquiring_builder_releases_on_partial_acquire_failure() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = acquiring(pure::<_, String, ()>("first"), move |_: &str| {
released_clone.store(true, Ordering::SeqCst);
async { Ok(()) }
})
.and(
fail::<&str, String, ()>("second acquire failed".to_string()),
|_: &str| async { Ok(()) },
)
.with(|(first, second): &(&str, &str)| {
pure::<_, String, ()>(format!("{} {}", first, second))
})
.run(&())
.await;
assert!(result.is_err());
assert!(
released.load(Ordering::SeqCst),
"first resource must be released when second acquire fails"
);
}
#[tokio::test]
async fn bracket_sync_releases_on_success() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = bracket_sync(
pure::<_, String, ()>(42),
move |_: i32| {
released_clone.store(true, Ordering::SeqCst);
Ok(())
},
|val: &i32| pure::<_, String, ()>(*val * 2),
)
.run(&())
.await;
assert_eq!(result, Ok(84));
assert!(released.load(Ordering::SeqCst));
}
#[tokio::test]
async fn bracket_sync_releases_on_use_failure() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = bracket_sync(
pure::<_, String, ()>(42),
move |_: i32| {
released_clone.store(true, Ordering::SeqCst);
Ok(())
},
|_: &i32| fail::<i32, String, ()>("use failed".to_string()),
)
.run(&())
.await;
assert_eq!(result, Err("use failed".to_string()));
assert!(
released.load(Ordering::SeqCst),
"cleanup must run on failure"
);
}
#[tokio::test]
async fn bracket3_releases_in_lifo_order() {
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let order1 = order.clone();
let order2 = order.clone();
let order3 = order.clone();
let result = bracket3(
pure::<_, String, ()>("first"),
pure::<_, String, ()>("second"),
pure::<_, String, ()>("third"),
move |_: &str| {
order1.lock().unwrap().push("release_first");
async { Ok(()) }
},
move |_: &str| {
order2.lock().unwrap().push("release_second");
async { Ok(()) }
},
move |_: &str| {
order3.lock().unwrap().push("release_third");
async { Ok(()) }
},
|_: &&str, _: &&str, _: &&str| pure::<_, String, ()>("done"),
)
.run(&())
.await;
assert_eq!(result, Ok("done"));
let releases = order.lock().unwrap();
assert_eq!(
*releases,
vec!["release_third", "release_second", "release_first"]
);
}
#[tokio::test]
async fn bracket_error_display() {
let acquire_err: BracketError<&str> = BracketError::AcquireError("failed");
assert_eq!(format!("{}", acquire_err), "acquire failed: failed");
let use_err: BracketError<&str> = BracketError::UseError("failed");
assert_eq!(format!("{}", use_err), "failed");
let cleanup_err: BracketError<&str> = BracketError::CleanupError("failed");
assert_eq!(format!("{}", cleanup_err), "cleanup failed: failed");
let both_err: BracketError<&str> = BracketError::Both {
use_error: "use failed",
cleanup_error: "cleanup failed",
};
assert_eq!(
format!("{}", both_err),
"use failed: use failed; cleanup also failed: cleanup failed"
);
}
#[tokio::test]
async fn bracket_error_accessors() {
let acquire_err: BracketError<&str> = BracketError::AcquireError("failed");
assert_eq!(acquire_err.acquire_error(), Some(&"failed"));
assert_eq!(acquire_err.use_error(), None);
assert_eq!(acquire_err.cleanup_error(), None);
let use_err: BracketError<&str> = BracketError::UseError("failed");
assert_eq!(use_err.acquire_error(), None);
assert_eq!(use_err.use_error(), Some(&"failed"));
assert_eq!(use_err.cleanup_error(), None);
let cleanup_err: BracketError<&str> = BracketError::CleanupError("failed");
assert_eq!(cleanup_err.acquire_error(), None);
assert_eq!(cleanup_err.use_error(), None);
assert_eq!(cleanup_err.cleanup_error(), Some(&"failed"));
let both_err: BracketError<&str> = BracketError::Both {
use_error: "use",
cleanup_error: "cleanup",
};
assert_eq!(both_err.acquire_error(), None);
assert_eq!(both_err.use_error(), Some(&"use"));
assert_eq!(both_err.cleanup_error(), Some(&"cleanup"));
}
#[tokio::test]
async fn bracket_error_map() {
let err: BracketError<i32> = BracketError::UseError(42);
let mapped = err.map(|x| x.to_string());
assert_eq!(mapped, BracketError::UseError("42".to_string()));
}
#[tokio::test]
async fn resource_both_combines_correctly() {
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let order1 = order.clone();
let order2 = order.clone();
let r1 = Resource::new(pure::<_, String, ()>(1), move |_: i32| {
order1.lock().unwrap().push("release_1");
async { Ok(()) }
});
let r2 = Resource::new(pure::<_, String, ()>(2), move |_: i32| {
order2.lock().unwrap().push("release_2");
async { Ok(()) }
});
let result = r1
.both(r2)
.with(|(a, b): &(i32, i32)| pure::<_, String, ()>(*a + *b))
.run(&())
.await;
assert_eq!(result, Ok(3));
let releases = order.lock().unwrap();
assert_eq!(*releases, vec!["release_2", "release_1"]);
}
#[test]
fn debug_impl_bracket() {
let b = Bracket::new(
pure::<_, String, ()>(42),
|_: i32| async { Ok::<(), String>(()) },
|v: &i32| pure::<_, String, ()>(*v),
);
let debug_str = format!("{:?}", b);
assert!(debug_str.contains("Bracket"));
assert!(debug_str.contains("<effect>"));
assert!(debug_str.contains("<function>"));
}
#[test]
fn debug_impl_bracket_full() {
let b = BracketFull::new(
pure::<_, String, ()>(42),
|_: i32| async { Ok::<(), String>(()) },
|v: &i32| pure::<_, String, ()>(*v),
);
let debug_str = format!("{:?}", b);
assert!(debug_str.contains("BracketFull"));
assert!(debug_str.contains("<effect>"));
assert!(debug_str.contains("<function>"));
}
#[test]
fn debug_impl_bracket_sync() {
let b = BracketSync::new(
pure::<_, String, ()>(42),
|_: i32| Ok::<(), String>(()),
|v: &i32| pure::<_, String, ()>(*v),
);
let debug_str = format!("{:?}", b);
assert!(debug_str.contains("BracketSync"));
assert!(debug_str.contains("<effect>"));
assert!(debug_str.contains("<function>"));
}
#[test]
fn debug_impl_bracket2() {
let b = Bracket2 {
acquire1: pure::<_, String, ()>(1),
acquire2: pure::<_, String, ()>(2),
use_fn: |_: &&i32, _: &&i32| pure::<_, String, ()>(0),
release1: |_: i32| async { Ok::<(), String>(()) },
release2: |_: i32| async { Ok::<(), String>(()) },
};
let debug_str = format!("{:?}", b);
assert!(debug_str.contains("Bracket2"));
assert!(debug_str.contains("<effect>"));
assert!(debug_str.contains("<function>"));
}
#[test]
fn debug_impl_bracket3() {
let b = Bracket3 {
acquire1: pure::<_, String, ()>(1),
acquire2: pure::<_, String, ()>(2),
acquire3: pure::<_, String, ()>(3),
use_fn: |_: &&i32, _: &&i32, _: &&i32| pure::<_, String, ()>(0),
release1: |_: i32| async { Ok::<(), String>(()) },
release2: |_: i32| async { Ok::<(), String>(()) },
release3: |_: i32| async { Ok::<(), String>(()) },
};
let debug_str = format!("{:?}", b);
assert!(debug_str.contains("Bracket3"));
assert!(debug_str.contains("<effect>"));
assert!(debug_str.contains("<function>"));
}
#[test]
fn debug_impl_resource() {
let r: Resource<i32, String, ()> =
Resource::new(pure::<_, String, ()>(42), |_: i32| async { Ok(()) });
let debug_str = format!("{:?}", r);
assert!(debug_str.contains("Resource"));
assert!(debug_str.contains("<effect>"));
assert!(debug_str.contains("<function>"));
}
#[test]
fn debug_impl_resource_with() {
let r: Resource<i32, String, ()> =
Resource::new(pure::<_, String, ()>(42), |_: i32| async { Ok(()) });
let rw = r.with(|v: &i32| pure::<_, String, ()>(*v));
let debug_str = format!("{:?}", rw);
assert!(debug_str.contains("ResourceWith"));
assert!(debug_str.contains("<resource>"));
assert!(debug_str.contains("<function>"));
}
#[test]
fn debug_impl_acquiring() {
let a: Acquiring<i32, String, ()> =
acquiring(pure::<_, String, ()>(42), |_: i32| async { Ok(()) });
let debug_str = format!("{:?}", a);
assert!(debug_str.contains("Acquiring"));
assert!(debug_str.contains("<resource>"));
}
#[test]
fn bracket_error_map_all_variants() {
let err: BracketError<i32> = BracketError::AcquireError(10);
let mapped = err.map(|x| x * 2);
assert_eq!(mapped, BracketError::AcquireError(20));
let err: BracketError<i32> = BracketError::UseError(10);
let mapped = err.map(|x| x * 2);
assert_eq!(mapped, BracketError::UseError(20));
let err: BracketError<i32> = BracketError::CleanupError(10);
let mapped = err.map(|x| x * 2);
assert_eq!(mapped, BracketError::CleanupError(20));
let err: BracketError<i32> = BracketError::Both {
use_error: 10,
cleanup_error: 20,
};
let mapped = err.map(|x| x * 2);
assert_eq!(
mapped,
BracketError::Both {
use_error: 20,
cleanup_error: 40
}
);
}
#[test]
fn bracket_error_std_error_impl() {
use std::error::Error;
let err: BracketError<std::io::Error> =
BracketError::AcquireError(std::io::Error::other("acquire"));
assert!(err.source().is_some());
let err: BracketError<std::io::Error> =
BracketError::UseError(std::io::Error::other("use"));
assert!(err.source().is_some());
let err: BracketError<std::io::Error> =
BracketError::CleanupError(std::io::Error::other("cleanup"));
assert!(err.source().is_some());
let err: BracketError<std::io::Error> = BracketError::Both {
use_error: std::io::Error::other("use"),
cleanup_error: std::io::Error::other("cleanup"),
};
let source = err.source().unwrap();
assert!(source.to_string().contains("use"));
}
#[tokio::test]
async fn bracket_execution_order_happy_path() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let log_acquire = log.clone();
let log_use = log.clone();
let log_release = log.clone();
let effect = bracket(
crate::effect::from_fn(move |_: &()| {
log_acquire.lock().unwrap().push("acquire");
Ok::<_, String>("resource")
}),
move |_: &str| {
log_release.lock().unwrap().push("release");
async { Ok(()) }
},
move |resource: &&str| {
log_use.lock().unwrap().push("use");
pure::<_, String, ()>(format!("used {}", resource))
},
);
let result = effect.run(&()).await;
assert_eq!(result, Ok("used resource".to_string()));
assert_eq!(
*log.lock().unwrap(),
vec!["acquire", "use", "release"],
"execution order must be acquire -> use -> release"
);
}
#[tokio::test]
async fn bracket_execution_order_use_fails() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let log_acquire = log.clone();
let log_use = log.clone();
let log_release = log.clone();
let effect = bracket(
crate::effect::from_fn(move |_: &()| {
log_acquire.lock().unwrap().push("acquire");
Ok::<_, String>(42)
}),
move |_: i32| {
log_release.lock().unwrap().push("release");
async { Ok(()) }
},
move |_resource: &i32| {
log_use.lock().unwrap().push("use");
fail::<i32, String, ()>("use failed".to_string())
},
);
let result = effect.run(&()).await;
assert_eq!(result, Err("use failed".to_string()));
assert_eq!(
*log.lock().unwrap(),
vec!["acquire", "use", "release"],
"release MUST run even when use fails"
);
}
#[tokio::test]
async fn bracket_execution_order_acquire_fails() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let log_acquire = log.clone();
let log_use = log.clone();
let log_release = log.clone();
let effect = bracket(
crate::effect::from_fn(move |_: &()| {
log_acquire.lock().unwrap().push("acquire");
Err::<i32, _>("acquire failed".to_string())
}),
move |_: i32| {
log_release.lock().unwrap().push("release");
async { Ok(()) }
},
move |_resource: &i32| {
log_use.lock().unwrap().push("use");
pure::<_, String, ()>(0)
},
);
let result = effect.run(&()).await;
assert_eq!(result, Err("acquire failed".to_string()));
assert_eq!(
*log.lock().unwrap(),
vec!["acquire"],
"only acquire should run when it fails"
);
}
#[tokio::test]
async fn bracket_resource_passed_correctly_through_all_phases() {
#[derive(Clone, Debug, PartialEq)]
struct TrackedResource {
id: u32,
}
let acquired_id = Arc::new(std::sync::Mutex::new(None));
let used_id = Arc::new(std::sync::Mutex::new(None));
let released_id = Arc::new(std::sync::Mutex::new(None));
let acquired_id_clone = acquired_id.clone();
let used_id_clone = used_id.clone();
let released_id_clone = released_id.clone();
let effect = bracket(
crate::effect::from_fn(move |_: &()| {
let resource = TrackedResource { id: 42 };
*acquired_id_clone.lock().unwrap() = Some(resource.id);
Ok::<_, String>(resource)
}),
move |resource: TrackedResource| {
*released_id_clone.lock().unwrap() = Some(resource.id);
async { Ok(()) }
},
move |resource: &TrackedResource| {
*used_id_clone.lock().unwrap() = Some(resource.id);
pure::<_, String, ()>(resource.id * 2)
},
);
let result = effect.run(&()).await;
assert_eq!(result, Ok(84));
assert_eq!(*acquired_id.lock().unwrap(), Some(42));
assert_eq!(*used_id.lock().unwrap(), Some(42));
assert_eq!(*released_id.lock().unwrap(), Some(42));
}
#[tokio::test]
async fn bracket_full_execution_order_happy_path() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let log_acquire = log.clone();
let log_use = log.clone();
let log_release = log.clone();
let effect = bracket_full(
crate::effect::from_fn(move |_: &()| {
log_acquire.lock().unwrap().push("acquire");
Ok::<_, String>("resource")
}),
move |_: &str| {
log_release.lock().unwrap().push("release");
async { Ok(()) }
},
move |_resource: &&str| {
log_use.lock().unwrap().push("use");
pure::<_, String, ()>("done")
},
);
let result = effect.run(&()).await;
assert!(result.is_ok());
assert_eq!(*log.lock().unwrap(), vec!["acquire", "use", "release"]);
}
#[tokio::test]
async fn bracket3_releases_first_two_if_third_acquire_fails() {
let order = Arc::new(std::sync::Mutex::new(Vec::new()));
let order1 = order.clone();
let order2 = order.clone();
let result = bracket3(
pure::<_, String, ()>("first"),
pure::<_, String, ()>("second"),
fail::<&str, String, ()>("acquire3 failed".to_string()),
move |_: &str| {
order1.lock().unwrap().push("release_first");
async { Ok(()) }
},
move |_: &str| {
order2.lock().unwrap().push("release_second");
async { Ok(()) }
},
|_: &str| async { Ok(()) },
|_: &&str, _: &&str, _: &&str| pure::<_, String, ()>("done"),
)
.run(&())
.await;
assert!(result.is_err());
let releases = order.lock().unwrap();
assert_eq!(*releases, vec!["release_second", "release_first"]);
}
#[tokio::test]
async fn bracket3_releases_first_if_second_acquire_fails() {
let released = Arc::new(AtomicBool::new(false));
let released_clone = released.clone();
let result = bracket3(
pure::<_, String, ()>("first"),
fail::<&str, String, ()>("acquire2 failed".to_string()),
pure::<_, String, ()>("third"),
move |_: &str| {
released_clone.store(true, Ordering::SeqCst);
async { Ok(()) }
},
|_: &str| async { Ok(()) },
|_: &str| async { Ok(()) },
|_: &&str, _: &&str, _: &&str| pure::<_, String, ()>("done"),
)
.run(&())
.await;
assert!(result.is_err());
assert!(
released.load(Ordering::SeqCst),
"first resource must be released when second acquire fails"
);
}
#[tokio::test]
async fn acquiring_builder_with_flat4_four_resources() {
let result = acquiring(pure::<_, String, ()>(1), |_: i32| async { Ok(()) })
.and(pure::<_, String, ()>(2), |_: i32| async { Ok(()) })
.and(pure::<_, String, ()>(3), |_: i32| async { Ok(()) })
.and(pure::<_, String, ()>(4), |_: i32| async { Ok(()) })
.with_flat4(|a: &i32, b: &i32, c: &i32, d: &i32| pure::<_, String, ()>(a + b + c + d))
.run(&())
.await;
assert_eq!(result, Ok(10));
}
}