sorock 0.12.0

A Multi-Raft implementation in Rust
Documentation
use super::*;

pub fn into_external_replication_stream(
    shard_index: ShardIndex,
    st: request::ReplicationStream,
) -> impl futures::stream::Stream<Item = raft::ReplicationStreamChunk> {
    use raft::replication_stream_chunk::Elem as ChunkElem;
    let header_stream = vec![Some(ChunkElem::Header(raft::ReplicationStreamHeader {
        shard_index,
        sender_id: st.sender_id.to_string(),
        prev_clock: Some(raft::Clock {
            term: st.prev_clock.term,
            index: st.prev_clock.index,
        }),
    }))];
    let header_stream = futures::stream::iter(header_stream);

    let chunk_stream = st.entries.map(|e0| {
        e0.map(|e| {
            ChunkElem::Entry(raft::ReplicationStreamEntry {
                clock: Some(raft::Clock {
                    term: e.this_clock.term,
                    index: e.this_clock.index,
                }),
                command: e.command,
            })
        })
    });

    header_stream
        .chain(chunk_stream)
        .map(|elem| raft::ReplicationStreamChunk { elem })
}

pub fn into_internal_snapshot_stream(
    out_stream: impl Stream<Item = Result<raft::SnapshotChunk, tonic::Status>>,
) -> impl Stream<Item = Result<Bytes>> {
    out_stream.map(|result| {
        result
            .map(|chunk| chunk.data.into())
            .map_err(|e| Error::BadSnapshotChunk(e).into())
    })
}