pub enum Subscribe<'a> {
Initial {
client: &'a Ax,
request: SubscribeRequest,
},
Pending(BoxFuture<'a, Result<BoxStream<'static, SubscribeResponse>>>),
Void,
}
Expand description
Variants§
Implementations§
Source§impl<'a> Subscribe<'a>
impl<'a> Subscribe<'a>
Sourcepub fn with_lower_bound(self, lower_bound: OffsetMap) -> Self
pub fn with_lower_bound(self, lower_bound: OffsetMap) -> Self
Add a (exclusive) lower bound to the subscription query.
For more information on offsets, as well as lower and upper bounds refer to the offsets and partitions documentation page.
The lower bound limits the start of the query events. As an example, consider the following (example) events:
{ "offset": 1, "event": { "temperature": 10 } }
{ "offset": 3, "event": { "temperature": 12 } }
{ "offset": 14, "event": { "temperature": 9 } }
If you set the lower bound to 10
, the first event to be returned
would be the last of the example.
§Panics
Calling this function after polling Subscribe
will result in a panic.
§Example
use ax_sdk::{Ax, AxOpts};
use futures::stream::StreamExt;
async fn lower_bound_example() {
let service = Ax::new(AxOpts::default()).await.unwrap();
let present_offsets = service.offsets().await.unwrap().present;
let mut response = service.subscribe("FROM allEvents")
.with_lower_bound(present_offsets)
.await
.unwrap();
while let Some(event) = response.next().await {
println!("{:?}", event);
}
}
Generating an OffsetMap
out of thin air is usually not possible because they
require stream IDs — which require knowledge of the streams and so on.
Hence, a more involved and useful example requires you to perform a query to
get an offset map when the query finishes streaming all results.
use ax_sdk::{types::{tags, Offset, service::QueryResponse}, Ax, AxOpts};
use futures::stream::StreamExt;
async fn lower_bound_example() {
let service = Ax::new(AxOpts::default()).await.unwrap();
// We're publishing events for a completely functional example
let publish_response = service
.publish()
.event(
tags!("temperature", "sensor:temp-sensor1"),
&serde_json::json!({ "temperature": 10 }),
).unwrap()
.event(
tags!("temperature", "sensor:temp-sensor2"),
&serde_json::json!({ "temperature": 21 }),
).unwrap()
.event(
tags!("temperature", "sensor:temp-sensor3"),
&serde_json::json!({ "temperature": 40 }),
).unwrap()
.await.unwrap();
// Query for the "halfway" event
let mut query_response = service
.query("FROM 'sensor:temp-sensor2'")
.await
.unwrap();
// This loop is a dirty hack for demonstration purposes
// in real world usage you will most likely be using the events
// and keeping the offset map in the end.
let offsets = loop {
let result = query_response.next().await.unwrap();
if let QueryResponse::Offsets(offsets) = result {
break offsets.offsets;
}
};
// Subcribe for all 'temperature' events with the previous query `OffsetMap`
// as a lower bound. We're expecting to only see events after the "halfway"
// event — {"temperature"}
let mut subscribe_response = service
.subscribe("FROM 'temperature'")
.with_lower_bound(offsets.clone())
.await.unwrap();
while let Some(response) = query_response.next().await {
println!("{:?}", response);
}
}
Trait Implementations§
Source§impl<'a> FusedFuture for Subscribe<'a>
impl<'a> FusedFuture for Subscribe<'a>
Source§fn is_terminated(&self) -> bool
fn is_terminated(&self) -> bool
Returns
true
if the underlying future should no longer be polled.Auto Trait Implementations§
impl<'a> Freeze for Subscribe<'a>
impl<'a> !RefUnwindSafe for Subscribe<'a>
impl<'a> Send for Subscribe<'a>
impl<'a> !Sync for Subscribe<'a>
impl<'a> Unpin for Subscribe<'a>
impl<'a> !UnwindSafe for Subscribe<'a>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn map<U, F>(self, f: F) -> Map<Self, F>
fn map<U, F>(self, f: F) -> Map<Self, F>
Map this future’s output to a different type, returning a new future of
the resulting type. Read more
Source§fn map_into<U>(self) -> MapInto<Self, U>
fn map_into<U>(self) -> MapInto<Self, U>
Map this future’s output to a different type, returning a new future of
the resulting type. Read more
Source§fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
Chain on a computation for when a future finished, passing the result of
the future to the provided closure
f
. Read moreSource§fn left_future<B>(self) -> Either<Self, B>
fn left_future<B>(self) -> Either<Self, B>
Source§fn right_future<A>(self) -> Either<A, Self>
fn right_future<A>(self) -> Either<A, Self>
Source§fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
Convert this future into a single element stream. Read more
Source§fn flatten(self) -> Flatten<Self>
fn flatten(self) -> Flatten<Self>
Flatten the execution of this future when the output of this
future is itself another future. Read more
Source§fn flatten_stream(self) -> FlattenStream<Self>
fn flatten_stream(self) -> FlattenStream<Self>
Flatten the execution of this future when the successful result of this
future is a stream. Read more
Source§fn fuse(self) -> Fuse<Self>where
Self: Sized,
fn fuse(self) -> Fuse<Self>where
Self: Sized,
Fuse a future such that
poll
will never again be called once it has
completed. This method can be used to turn any Future
into a
FusedFuture
. Read moreSource§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
Do something with the output of a future before passing it on. Read more
Source§fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
Catches unwinding panics while polling the future. Read more
Create a cloneable handle to this future where all handles will resolve
to the same result. Read more
Source§fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)where
Self: Sized,
fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)where
Self: Sized,
Turn this future into a future that yields
()
on completion and sends
its output to another future on a separate task. Read moreSource§fn boxed<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>
fn boxed<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>
Wrap the future in a Box, pinning it. Read more
Source§fn boxed_local<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + 'a>>where
Self: Sized + 'a,
fn boxed_local<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + 'a>>where
Self: Sized + 'a,
Wrap the future in a Box, pinning it. Read more
Source§fn unit_error(self) -> UnitError<Self>where
Self: Sized,
fn unit_error(self) -> UnitError<Self>where
Self: Sized,
Turns a
Future<Output = T>
into a
TryFuture<Ok = T, Error = ()
>.Source§fn never_error(self) -> NeverError<Self>where
Self: Sized,
fn never_error(self) -> NeverError<Self>where
Self: Sized,
Turns a
Future<Output = T>
into a
TryFuture<Ok = T, Error = Never
>.Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<F> IntoFuture for Fwhere
F: Future,
impl<F> IntoFuture for Fwhere
F: Future,
Source§type IntoFuture = F
type IntoFuture = F
Which kind of future are we turning this into?
Source§fn into_future(self) -> <F as IntoFuture>::IntoFuture
fn into_future(self) -> <F as IntoFuture>::IntoFuture
Creates a future from a value. Read more
Source§impl<T> References<RawCodec> for T
impl<T> References<RawCodec> for T
Source§impl<Fut> TryFutureExt for Fut
impl<Fut> TryFutureExt for Fut
Source§fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok>
fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok>
Source§fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
Maps this future’s success value to a different value. Read more
Source§fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E>
fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E>
Maps this future’s success value to a different value, and permits for error handling resulting in the same type. Read more
Source§fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
Maps this future’s error value to a different value. Read more
Source§fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
Executes another future after this one resolves successfully. The
success value is passed to a closure to create this subsequent future. Read more
Source§fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
Executes another future if this one resolves to an error. The
error value is passed to a closure to create this subsequent future. Read more
Source§fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
Do something with the success value of a future before passing it on. Read more
Source§fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
Do something with the error value of a future before passing it on. Read more
Source§fn try_flatten(self) -> TryFlatten<Self, Self::Ok>
fn try_flatten(self) -> TryFlatten<Self, Self::Ok>
Flatten the execution of this future when the successful result of this
future is another future. Read more
Source§fn try_flatten_stream(self) -> TryFlattenStream<Self>
fn try_flatten_stream(self) -> TryFlattenStream<Self>
Flatten the execution of this future when the successful result of this
future is a stream. Read more