pub trait ServiceExt<I: Send>: Service<I> {
// Provided methods
fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
where Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
U: Send + Service<O1, Out = Result<O2, E2>>,
O1: Send,
O2: Send,
E1: Send,
E2: Send { ... }
fn except<F>(self, on_err: F) -> Except<Self, F>
where Self: Sized { ... }
fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, Inspect<O, F>>
where Self: Sized + Service<I, Out = Result<O, E>> + Send,
F: Fn(&O) + Send,
O: Send { ... }
fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
where Self: Sized + Send + Clone + 'static,
Self::Out: Send { ... }
fn spawn_each(self) -> SpawnEach<I, Self>
where Self: Sized + Send + Clone + 'static,
Self::Out: Send { ... }
fn flow_scope<O, M, E1, S, F>(
self,
f: F,
s: S,
) -> (Self, Scope<O, M, E1, S, F>)
where F: Fn(&O) -> Result<M, E1>,
Self: Sized + Send,
O: Send,
E1: Send { ... }
fn flow_scope_each<O, M, E1, S, F>(
self,
f: F,
s: S,
) -> (Self, ScopeEach<O, M, E1, S, F>)
where F: Fn(&O) -> Result<M, E1>,
Self: Sized + Send,
E1: Send,
O: Send + Clone { ... }
fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, Map<O2, F>>
where Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
F: FnMut(O1) -> H + Send,
H: Future<Output = O2> + Send,
O1: Send,
O2: Send,
E1: Send { ... }
fn flow_filter_map<O1, O2, E1, F, H>(
self,
f: F,
) -> Left<Self, FilterMap<O2, F>>
where Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
O1: Send,
O2: Send,
E1: Send,
F: FnMut(O1) -> H + Send,
H: Future<Output = Option<O2>> + Send { ... }
}Provided Methods§
fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
fn except<F>(self, on_err: F) -> Except<Self, F>where
Self: Sized,
Sourcefn flow_inspect<O, E, F>(self, f: F) -> Left<Self, Inspect<O, F>>
fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, Inspect<O, F>>
Adds an inspection step that invokes the supplied callback on every successful output of the wrapped service.
This method returns a 2‑tuple consisting of:
self– the original service unchanged.- an
inspect::Inspect<O, E, F>instance that intercepts the service’s output. For each successful result (Ok(o)), the closurefis called with a reference too. The output is then passed through unchanged.
§Parameters
f– A callback implementingFn(&O). The callback receives a reference to the successful output value. It can be used for logging, metrics, or any side‑effect‑only operation.
§Return value
A tuple (Self, inspect::Inspect<O, E, F>) that can be used in a
service pipeline (e.g., within the flow combinator). The first
element is the original service, and the second element is a service
that performs the inspection.
§Example
use flowly_service::{Service, flow::Flow, inspect::Inspect};
let service = MyService::new();
let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
println!("Got value: {:?}", value);
});
let flow = Flow::from(orig).and(inspector);Sourcefn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
Creates a concurrent wrapper around the current service that limits the number of parallel executions.
This method returns a ConcurrentEach<I, Self> instance that delegates work to a pool
of worker tasks. Each worker runs the underlying service independently, allowing
multiple inputs to be processed concurrently. The limit argument controls the
maximum number of worker tasks that may run in parallel.
Parameters
self: The service instance to be wrapped. It must implementService<I>and satisfySend,Clone, and'staticbounds.limit: The maximum number of concurrent worker tasks to spawn. Iflimitis greater than the current number of tasks, new tasks will be created up to this bound.
Return value
A ConcurrentEach<I, Self> which itself implements Service. When handling an
input, it forwards the input to one of the available workers and returns a stream
of results that can be awaited asynchronously.
Sourcefn spawn_each(self) -> SpawnEach<I, Self>
fn spawn_each(self) -> SpawnEach<I, Self>
Creates a new SpawnEach wrapper around the current service.
The wrapper spawns a separate task for each input message, forwarding
the results through a bounded mpsc channel. This allows the
underlying service to process messages concurrently without
blocking the caller.
§Parameters
self– The service instance to wrap. The service must implementService<I>for some input typeI.
§Constraints
Self: Sized + Send + Clone + 'static– The service must be clonable and safe to send across threads.Self::Out: Send– The output type of the service must beSendbecause it will be transported across channels.
§Return value
Returns a SpawnEach<I, Self> that implements Service<I> with
the same input type. The new service can be used just like the
original one, but each invocation of handle will spawn a
dedicated task.
§Example
use flowly_service::{Service, spawn_each};
struct MyService;
impl Service<u32> for MyService {
type Out = Result<String, std::io::Error>;
fn handle(&mut self, input: u32, _cx: &crate::Context)
-> impl futures::Stream<Item = Self::Out> + Send
{
// …
}
}
let service = MyService;
// Wrap in SpawnEach
let concurrent_service = service.spawn_each();
// Now `concurrent_service` can be used as a Service and will process
// each input concurrently.§Note
The default message buffer size is 2.
Sourcefn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
This method consumes the current service and returns a tuple containing the original service and a new
Scope service that forwards transformed messages to s. The transformation function f receives
a reference to the original input O and returns either a message M for s or an error E1.\
§Type Parameters
O: Type of the original input that will be received by the outer service.M: Type of the message thatsexpects.E1: Error type returned by the transformation functionf.S: The inner service that will handle the transformed messages.F: Function or closure of typeFn(&O) -> Result<M, E1>.
§Parameters
self– The current service (moved into the returned tuple).f– Function that transforms&OintoResult<M, E1>.s– The inner service to be invoked after successful transformation.
§Returns
A tuple (Self, Scope<O, M, E1, S, F>) where:\n
Selfis the original service that can continue to be used.\nScope<O, M, E1, S, F>is a new service that:\n- Calls
fwith the incoming input.\n - If
freturnsOk(m), forwardsmtosand collects all emitted outputs intoVec<O>.\n - If
freturnsErr(e), immediately returns an error wrapped inEither::Right(e)without invokings.\n
- Calls
§Example
let (service, scoped) = flow_scope(service, |msg: &Input| {
if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
}, inner_service);§Constraints
All involved types must be Send, and Self must implement Sized + Send.
fn flow_scope_each<O, M, E1, S, F>( self, f: F, s: S, ) -> (Self, ScopeEach<O, M, E1, S, F>)
fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, Map<O2, F>>
fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, FilterMap<O2, F>>
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.