use std::sync::Arc;
use openraft_macros::since;
use crate::RaftTypeConfig;
use crate::ReadPolicy;
use crate::base::BoxStream;
use crate::batch::Batch;
use crate::core::raft_msg::RaftMsg;
use crate::errors::ClientWriteError;
use crate::errors::Fatal;
use crate::errors::LinearizableReadError;
use crate::impls::ProgressResponder;
use crate::raft::ClientWriteResponse;
use crate::raft::ClientWriteResult;
use crate::raft::linearizable_read::Linearizer;
use crate::raft::message::WriteResult;
use crate::raft::message::into_write_result;
use crate::raft::raft_inner::RaftInner;
use crate::raft::responder::core_responder::CoreResponder;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::BatchOf;
use crate::type_config::alias::EntryPayloadOf;
#[cfg(feature = "runtime-stats")]
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::WriteResponderOf;
#[since(version = "0.10.0")]
pub(crate) struct AppApi<'a, C>
where C: RaftTypeConfig
{
inner: &'a Arc<RaftInner<C>>,
}
impl<'a, C> AppApi<'a, C>
where C: RaftTypeConfig
{
pub(in crate::raft) fn new(inner: &'a Arc<RaftInner<C>>) -> Self {
Self { inner }
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn get_read_linearizer(
&self,
read_policy: ReadPolicy,
) -> Result<Result<Linearizer<C>, LinearizableReadError<C>>, Fatal<C>> {
let (tx, rx) = C::oneshot();
self.inner.call_core(RaftMsg::GetLinearizer { read_policy, tx }, rx).await
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip(self, payload))]
pub(crate) async fn client_write(
&self,
payload: EntryPayloadOf<C>,
) -> Result<Result<ClientWriteResponse<C>, ClientWriteError<C>>, Fatal<C>> {
let (responder, _commit_rx, complete_rx) = ProgressResponder::new();
self.do_client_write_ff(
Batch::of([payload]),
Batch::of([Some(CoreResponder::Progress(responder))]),
)
.await?;
let res: ClientWriteResult<C> = self.inner.recv_msg(complete_rx).await?;
Ok(res)
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn client_write_ff(
&self,
payload: EntryPayloadOf<C>,
responder: Option<WriteResponderOf<C>>,
) -> Result<(), Fatal<C>> {
self.do_client_write_ff(
Batch::of([payload]),
Batch::of([responder.map(|r| CoreResponder::UserDefined(r))]),
)
.await
}
#[since(version = "0.10.0")]
async fn do_client_write_ff(
&self,
payloads: BatchOf<C, EntryPayloadOf<C>>,
responders: BatchOf<C, Option<CoreResponder<C>>>,
) -> Result<(), Fatal<C>> {
self.inner
.send_msg(RaftMsg::ClientWrite {
payloads,
responders,
expected_leader: None,
#[cfg(feature = "runtime-stats")]
proposed_at: propose_at_now::<C>(),
})
.await?;
Ok(())
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn client_write_many(
&self,
payloads: impl IntoIterator<Item = EntryPayloadOf<C>>,
) -> Result<BoxStream<'static, Result<WriteResult<C>, Fatal<C>>>, Fatal<C>> {
let payloads: Vec<EntryPayloadOf<C>> = payloads.into_iter().collect();
let mut responders = Vec::with_capacity(payloads.len());
let mut receivers = Vec::with_capacity(payloads.len());
for _ in 0..payloads.len() {
let (responder, _commit_rx, complete_rx) = ProgressResponder::<C, ClientWriteResult<C>>::new();
responders.push(Some(CoreResponder::Progress(responder)));
receivers.push(complete_rx);
}
self.do_client_write_ff(Batch::of(payloads), Batch::of(responders)).await?;
let stream = futures_util::stream::unfold(Some(receivers.into_iter()), |opt_iter| async move {
let mut iter = opt_iter?;
let rx = iter.next()?;
match rx.await {
Ok(result) => Some((Ok(into_write_result(result)), Some(iter))),
Err(_) => Some((Err(Fatal::Stopped), None)),
}
});
Ok(Box::pin(stream))
}
}
#[cfg(feature = "runtime-stats")]
pub(crate) fn propose_at_now<C: RaftTypeConfig>() -> InstantOf<C> {
C::now()
}