pub struct FetchService;Expand description
A Service using Storage as Context taking FetchRequest returning FetchResponse.
use rama::{Context, Layer as _, Service as _, layer::MapStateLayer};
use tansu_sans_io::{
CreateTopicsRequest, ErrorCode, FetchRequest,
create_topics_request::CreatableTopic,
fetch_request::{FetchPartition, FetchTopic},
};
use tansu_storage::{CreateTopicsService, Error, FetchService, StorageContainer};
use url::Url;
const CLUSTER_ID: &str = "tansu";
const NODE_ID: i32 = 111;
const HOST: &str = "localhost";
const PORT: i32 = 9092;
let storage = StorageContainer::builder()
.cluster_id(CLUSTER_ID)
.node_id(NODE_ID)
.advertised_listener(Url::parse(&format!("tcp://{HOST}:{PORT}"))?)
.storage(Url::parse("memory://tansu/")?)
.build()
.await?;
let create_topic = {
let storage = storage.clone();
MapStateLayer::new(|_| storage).into_layer(CreateTopicsService)
};
let name = "abcba";
let response = create_topic
.serve(
Context::default(),
CreateTopicsRequest::default()
.topics(Some(vec![
CreatableTopic::default()
.name(name.into())
.num_partitions(5)
.replication_factor(3)
.assignments(Some([].into()))
.configs(Some([].into())),
]))
.validate_only(Some(false)),
)
.await?;
let topics = response.topics.unwrap_or_default();
assert_eq!(1, topics.len());
assert_eq!(ErrorCode::None, ErrorCode::try_from(topics[0].error_code)?);
let fetch = {
let storage = storage.clone();
MapStateLayer::new(|_| storage).into_layer(FetchService)
};
let partition = 0;
let response = fetch
.serve(
Context::default(),
FetchRequest::default()
.topics(Some(
[FetchTopic::default()
.topic(Some(name.into()))
.partitions(Some(
[FetchPartition::default().partition(partition)].into(),
))]
.into(),
))
.max_bytes(Some(0))
.max_wait_ms(5_000),
)
.await?;
let topics = response.responses.as_deref().unwrap_or_default();
assert_eq!(1, topics.len());
let partitions = topics[0].partitions.as_deref().unwrap_or_default();
assert_eq!(1, partitions.len());
assert_eq!(
ErrorCode::None,
ErrorCode::try_from(partitions[0].error_code)?
);Trait Implementations§
Source§impl Clone for FetchService
impl Clone for FetchService
Source§fn clone(&self) -> FetchService
fn clone(&self) -> FetchService
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreSource§impl Debug for FetchService
impl Debug for FetchService
Source§impl Default for FetchService
impl Default for FetchService
Source§fn default() -> FetchService
fn default() -> FetchService
Returns the “default value” for a type. Read more
Source§impl Hash for FetchService
impl Hash for FetchService
Source§impl Ord for FetchService
impl Ord for FetchService
Source§fn cmp(&self, other: &FetchService) -> Ordering
fn cmp(&self, other: &FetchService) -> Ordering
1.21.0 · Source§fn max(self, other: Self) -> Selfwhere
Self: Sized,
fn max(self, other: Self) -> Selfwhere
Self: Sized,
Compares and returns the maximum of two values. Read more
Source§impl PartialEq for FetchService
impl PartialEq for FetchService
Source§impl PartialOrd for FetchService
impl PartialOrd for FetchService
Source§impl<G> Service<G, FetchRequest> for FetchServicewhere
G: Storage,
impl<G> Service<G, FetchRequest> for FetchServicewhere
G: Storage,
impl Copy for FetchService
impl Eq for FetchService
impl StructuralPartialEq for FetchService
Auto Trait Implementations§
impl Freeze for FetchService
impl RefUnwindSafe for FetchService
impl Send for FetchService
impl Sync for FetchService
impl Unpin for FetchService
impl UnsafeUnpin for FetchService
impl UnwindSafe for FetchService
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<Q, K> Comparable<K> for Q
impl<Q, K> Comparable<K> for Q
Source§impl<Q, K> Equivalent<K> for Q
impl<Q, K> Equivalent<K> for Q
Source§fn equivalent(&self, key: &K) -> bool
fn equivalent(&self, key: &K) -> bool
Compare self to
key and return true if they are equal.Source§impl<Q, K> Equivalent<K> for Q
impl<Q, K> Equivalent<K> for Q
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more