use rama::{
Layer as _, Service as _,
layer::{MapErrLayer, MapStateLayer},
};
use tansu_sans_io::{
ApiKey as _, HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, OffsetCommitRequest,
OffsetFetchRequest, SyncGroupRequest,
};
use tansu_service::FrameRouteBuilder;
use crate::{
Error,
broker::group::{
heartbeat::HeartbeatService, join::JoinGroupService, leave::LeaveGroupService,
offset_commit::OffsetCommitService, offset_fetch::OffsetFetchService,
sync::SyncGroupService,
},
coordinator::group::Coordinator,
};
pub fn services<C>(
builder: FrameRouteBuilder<(), Error>,
coordinator: C,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
C: Coordinator,
{
[
heartbeat,
join_group,
leave_group,
offset_commit,
offset_fetch,
sync_group,
]
.iter()
.try_fold(builder, |builder, service| {
service(builder, coordinator.clone())
})
}
pub fn heartbeat<C>(
builder: FrameRouteBuilder<(), Error>,
coordinator: C,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
C: Coordinator,
{
builder
.with_route(
HeartbeatRequest::KEY,
(
MapErrLayer::new(Error::from),
MapStateLayer::new(|_| coordinator),
)
.into_layer(HeartbeatService)
.boxed(),
)
.map_err(Into::into)
}
pub fn join_group<C>(
builder: FrameRouteBuilder<(), Error>,
coordinator: C,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
C: Coordinator,
{
builder
.with_route(
JoinGroupRequest::KEY,
(
MapErrLayer::new(Error::from),
MapStateLayer::new(|_| coordinator),
)
.into_layer(JoinGroupService)
.boxed(),
)
.map_err(Into::into)
}
pub fn leave_group<C>(
builder: FrameRouteBuilder<(), Error>,
coordinator: C,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
C: Coordinator,
{
builder
.with_route(
LeaveGroupRequest::KEY,
(
MapErrLayer::new(Error::from),
MapStateLayer::new(|_| coordinator),
)
.into_layer(LeaveGroupService)
.boxed(),
)
.map_err(Into::into)
}
pub fn offset_commit<C>(
builder: FrameRouteBuilder<(), Error>,
coordinator: C,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
C: Coordinator,
{
builder
.with_route(
OffsetCommitRequest::KEY,
(
MapErrLayer::new(Error::from),
MapStateLayer::new(|_| coordinator),
)
.into_layer(OffsetCommitService)
.boxed(),
)
.map_err(Into::into)
}
pub fn offset_fetch<C>(
builder: FrameRouteBuilder<(), Error>,
coordinator: C,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
C: Coordinator,
{
builder
.with_route(
OffsetFetchRequest::KEY,
(
MapErrLayer::new(Error::from),
MapStateLayer::new(|_| coordinator),
)
.into_layer(OffsetFetchService)
.boxed(),
)
.map_err(Into::into)
}
pub fn sync_group<C>(
builder: FrameRouteBuilder<(), Error>,
coordinator: C,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
C: Coordinator,
{
builder
.with_route(
SyncGroupRequest::KEY,
(
MapErrLayer::new(Error::from),
MapStateLayer::new(|_| coordinator),
)
.into_layer(SyncGroupService)
.boxed(),
)
.map_err(Into::into)
}