use quiver_embed::{CollectionId, WalOp};
use quiver_proto::v1::{self, quiver_client::QuiverClient};
use crate::AppState;
pub(crate) fn spawn_follower(state: AppState, leader_url: String, api_key: Option<String>) {
tokio::spawn(async move {
match follow(&state, &leader_url, api_key.as_deref()).await {
Ok(()) => tracing::warn!(
leader = %leader_url,
"replication stream ended; follower is now serving stale read-only state (restart to re-sync)"
),
Err(e) => tracing::error!(
leader = %leader_url,
error = %e,
"replication follower stopped; serving stale read-only state (restart to re-sync)"
),
}
});
}
async fn follow(state: &AppState, leader_url: &str, api_key: Option<&str>) -> Result<(), String> {
let mut client = QuiverClient::connect(leader_url.to_owned())
.await
.map_err(|e| format!("connecting to leader: {e}"))?;
let mut request = tonic::Request::new(v1::ReplicateRequest {});
if let Some(key) = api_key {
let value = format!("Bearer {key}")
.parse()
.map_err(|_| "invalid leader api key".to_owned())?;
request.metadata_mut().insert("authorization", value);
}
let mut stream = client
.replicate(request)
.await
.map_err(|e| format!("opening replication stream: {e}"))?
.into_inner();
tracing::info!(leader = %leader_url, "replication follower connected");
while let Some(op) = stream
.message()
.await
.map_err(|e| format!("reading replication stream: {e}"))?
{
if let Some(wal_op) = proto_to_op(op) {
state
.apply_replicated(wal_op)
.await
.map_err(|e| format!("applying replicated op: {e}"))?;
}
}
Ok(())
}
fn proto_to_op(op: v1::ReplicationOp) -> Option<WalOp> {
use v1::replication_op::Op;
Some(match op.op? {
Op::CreateCollection(c) => WalOp::CreateCollection {
collection_id: CollectionId(c.collection_id),
name: c.name,
descriptor: c.descriptor,
},
Op::DropCollection(d) => WalOp::DropCollection {
collection_id: CollectionId(d.collection_id),
},
Op::Upsert(u) => WalOp::Upsert {
collection_id: CollectionId(u.collection_id),
external_id: u.external_id,
vector: u.vector,
payload: u.payload,
},
Op::Delete(d) => WalOp::Delete {
collection_id: CollectionId(d.collection_id),
external_id: d.external_id,
},
})
}