pub enum SubscribeMonotonic<'a> {
Initial {
client: &'a Ax,
request: SubscribeMonotonicRequest,
},
Pending(BoxFuture<'a, Result<BoxStream<'static, SubscribeMonotonicResponse>>>),
Void,
}
Expand description
Request builder for monotonic subscriptions.
Monotonic subscriptions keep track of the highest sort order
(LamportTimestamp
and
StreamId
) seen so far, ending the stream with a
SubscribeMonotonicResponse::TimeTravel
message if the next event would be out of order.
Warning: SubscribeMonotonic
implements the Future
trait, as such it can be polled.
Calling any SubscribeMonotonic
function after polling will result in a panic!
Variants§
Implementations§
Source§impl<'a> SubscribeMonotonic<'a>
impl<'a> SubscribeMonotonic<'a>
Sourcepub fn with_lower_bound(self, lower_bound: OffsetMap) -> Self
pub fn with_lower_bound(self, lower_bound: OffsetMap) -> Self
Add a (exclusive) lower bound to the subscription query.
For more information on offsets, as well as lower and upper bounds refer to the offsets and partitions documentation page.
The lower bound limits the start of the query events. As an example, consider the following (example) events:
{ "offset": 1, "event": { "temperature": 10 } }
{ "offset": 3, "event": { "temperature": 12 } }
{ "offset": 14, "event": { "temperature": 9 } }
If you set the lower bound to 10
, the first event to be returned
would be the last of the example.
§Panics
Calling this function after polling Subscribe
will result in a panic.
§Example
use ax_sdk::{Ax, AxOpts};
use futures::stream::StreamExt;
async fn lower_bound_example() {
let service = Ax::new(AxOpts::default()).await.unwrap();
let present_offsets = service.offsets().await.unwrap().present;
let mut response = service.subscribe("FROM allEvents")
.with_lower_bound(present_offsets)
.await
.unwrap();
while let Some(event) = response.next().await {
println!("{:?}", event);
}
}
Generating an OffsetMap
out of thin air is usually not possible because they
require stream IDs — which require knowledge of the streams and so on.
Hence, a more involved and useful example requires you to perform a query to
get an offset map when the query finishes streaming all results.
use ax_sdk::{types::{tags, Offset, service::QueryResponse}, Ax, AxOpts};
use futures::stream::StreamExt;
async fn lower_bound_example() {
let service = Ax::new(AxOpts::default()).await.unwrap();
// We're publishing events for a completely functional example
let publish_response = service
.publish()
.event(
tags!("temperature", "sensor:temp-sensor1"),
&serde_json::json!({ "temperature": 10 }),
).unwrap()
.event(
tags!("temperature", "sensor:temp-sensor2"),
&serde_json::json!({ "temperature": 21 }),
).unwrap()
.event(
tags!("temperature", "sensor:temp-sensor3"),
&serde_json::json!({ "temperature": 40 }),
).unwrap()
.await.unwrap();
// Query for the "halfway" event
let mut query_response = service
.query("FROM 'sensor:temp-sensor2'")
.await
.unwrap();
// This loop is a dirty hack for demonstration purposes
// in real world usage you will most likely be using the events
// and keeping the offset map in the end.
let offsets = loop {
let result = query_response.next().await.unwrap();
if let QueryResponse::Offsets(offsets) = result {
break offsets.offsets;
}
};
// Subcribe for all 'temperature' events with the previous query `OffsetMap`
// as a lower bound. We're expecting to only see events after the "halfway"
// event — {"temperature"}
let mut subscribe_response = service
.subscribe_monotonic("FROM 'temperature'")
.with_lower_bound(offsets.clone())
.await.unwrap();
while let Some(response) = query_response.next().await {
println!("{:?}", response);
}
}
Trait Implementations§
Source§impl<'a> FusedFuture for SubscribeMonotonic<'a>
impl<'a> FusedFuture for SubscribeMonotonic<'a>
Source§fn is_terminated(&self) -> bool
fn is_terminated(&self) -> bool
true
if the underlying future should no longer be polled.Auto Trait Implementations§
impl<'a> Freeze for SubscribeMonotonic<'a>
impl<'a> !RefUnwindSafe for SubscribeMonotonic<'a>
impl<'a> Send for SubscribeMonotonic<'a>
impl<'a> !Sync for SubscribeMonotonic<'a>
impl<'a> Unpin for SubscribeMonotonic<'a>
impl<'a> !UnwindSafe for SubscribeMonotonic<'a>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn map<U, F>(self, f: F) -> Map<Self, F>
fn map<U, F>(self, f: F) -> Map<Self, F>
Source§fn map_into<U>(self) -> MapInto<Self, U>
fn map_into<U>(self) -> MapInto<Self, U>
Source§fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
f
. Read moreSource§fn left_future<B>(self) -> Either<Self, B>
fn left_future<B>(self) -> Either<Self, B>
Source§fn right_future<A>(self) -> Either<A, Self>
fn right_future<A>(self) -> Either<A, Self>
Source§fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
Source§fn flatten(self) -> Flatten<Self>
fn flatten(self) -> Flatten<Self>
Source§fn flatten_stream(self) -> FlattenStream<Self>
fn flatten_stream(self) -> FlattenStream<Self>
Source§fn fuse(self) -> Fuse<Self>where
Self: Sized,
fn fuse(self) -> Fuse<Self>where
Self: Sized,
poll
will never again be called once it has
completed. This method can be used to turn any Future
into a
FusedFuture
. Read moreSource§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
Source§fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
Source§fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)where
Self: Sized,
fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)where
Self: Sized,
()
on completion and sends
its output to another future on a separate task. Read moreSource§fn boxed<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>
fn boxed<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>
Source§fn boxed_local<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + 'a>>where
Self: Sized + 'a,
fn boxed_local<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + 'a>>where
Self: Sized + 'a,
Source§fn unit_error(self) -> UnitError<Self>where
Self: Sized,
fn unit_error(self) -> UnitError<Self>where
Self: Sized,
Future<Output = T>
into a
TryFuture<Ok = T, Error = ()
>.Source§fn never_error(self) -> NeverError<Self>where
Self: Sized,
fn never_error(self) -> NeverError<Self>where
Self: Sized,
Future<Output = T>
into a
TryFuture<Ok = T, Error = Never
>.