use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tracing::info;
use crate::error::{KubericError, Result};
use crate::events::StateProviderEvent;
use crate::handles::PartitionState;
use crate::types::{Lsn, OperationStream, ReplicaInfo};
pub(crate) async fn run_build_replica_copy(
replica: ReplicaInfo,
state_provider_tx: mpsc::UnboundedSender<StateProviderEvent>,
state: Arc<PartitionState>,
reply_timeout: Duration,
) -> Result<()> {
use crate::proto::replicator_data_client::ReplicatorDataClient;
let secondary_addr = &replica.replicator_address;
info!(
secondary_id = replica.id,
%secondary_addr,
"BuildReplica: connecting to secondary data plane"
);
let channel = tonic::transport::Channel::from_shared(secondary_addr.clone())
.map_err(|e| KubericError::Internal(Box::new(e)))?
.connect()
.await
.map_err(|e| KubericError::Internal(Box::new(e)))?;
let mut data_client = ReplicatorDataClient::new(channel);
let ctx_resp = data_client
.get_copy_context(crate::proto::GetCopyContextRequest {})
.await
.map_err(|e| KubericError::Internal(Box::new(e)))?;
let copy_context_ops = ctx_resp.into_inner().operations;
info!(
context_items = copy_context_ops.len(),
"BuildReplica: got copy context from secondary"
);
let copy_context = vec_to_stream(
copy_context_ops
.into_iter()
.map(|op| (op.lsn, bytes::Bytes::from(op.data)))
.collect(),
);
let up_to_lsn = state.committed_lsn();
let state_stream: OperationStream = {
let (tx, rx) = oneshot::channel();
state_provider_tx
.send(StateProviderEvent::GetCopyState {
up_to_lsn,
copy_context,
reply: tx,
})
.map_err(|_| KubericError::Closed)?;
match tokio::time::timeout(reply_timeout, rx).await {
Ok(Ok(result)) => result?,
Ok(Err(_)) => return Err(KubericError::Closed),
Err(_) => return Err(KubericError::Internal("state_provider timeout".into())),
}
};
let state_ops = collect_stream(state_stream).await;
let copy_lsn = state_ops.iter().map(|(lsn, _)| *lsn).max().unwrap_or(0);
state.set_copy_lsn(replica.id, copy_lsn);
info!(
items = state_ops.len(),
up_to_lsn, copy_lsn, "BuildReplica: got copy state from local StateProvider"
);
let items: Vec<crate::proto::CopyItem> = state_ops
.into_iter()
.map(|(lsn, data)| crate::proto::CopyItem {
lsn,
data: data.to_vec(),
})
.collect();
let item_stream = tokio_stream::iter(items);
let resp = data_client
.copy_stream(item_stream)
.await
.map_err(|e| KubericError::Internal(Box::new(e)))?;
info!(
items_received = resp.into_inner().items_received,
"BuildReplica: copy complete"
);
Ok(())
}
async fn collect_stream(mut stream: OperationStream) -> Vec<(Lsn, bytes::Bytes)> {
let mut ops = Vec::new();
while let Some(op) = stream.get_operation().await {
ops.push((op.lsn, op.data.clone()));
op.acknowledge();
}
ops
}
fn vec_to_stream(ops: Vec<(Lsn, bytes::Bytes)>) -> OperationStream {
let (tx, stream) = OperationStream::channel(ops.len().max(1));
tokio::spawn(async move {
for (lsn, data) in ops {
let op = crate::types::Operation::new(lsn, data, None);
if tx.send(op).await.is_err() {
break;
}
}
});
stream
}