use std::str;
use std::sync::Arc;
use std::vec::Vec;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use crate::cluster::node;
use crate::cluster::Node;
use crate::commands::Message;
use crate::errors::{Error, Result};
use crate::net::Connection;
use crate::AdminPolicy;
use super::PartitionTable;
#[derive(Debug, Clone)]
pub struct PartitionTokenizer {
buffer: Vec<u8>,
}
impl PartitionTokenizer {
pub async fn new(
policy: &AdminPolicy,
conn: &mut Connection,
node: &Arc<Node>,
) -> Result<Self> {
let command = "replicas";
let info_map = Message::info(policy, conn, &[command, node::PARTITION_GENERATION]).await?;
if let Some(buf) = info_map.get(command) {
return Ok(PartitionTokenizer {
buffer: buf.as_bytes().to_owned(),
});
}
node.update_partitions(&info_map)?;
Err(Error::BadResponse("Missing replicas info".to_string()))
}
pub fn update_partition(&self, nmap: &mut PartitionTable, node: &Arc<Node>) -> Result<()> {
let part_str = str::from_utf8(&self.buffer)?;
for part in part_str.trim_end().split(';') {
match part.split_once(':') {
Some((ns, info)) => {
let mut info_section = info.split(',');
let reigime = info_section
.next()
.ok_or_else(|| Error::BadResponse("Missing regime".to_string()))?
.parse()
.map_err(|err| Error::BadResponse(format!("Invalid regime: {err}")))?;
let n_replicas = info_section
.next()
.ok_or_else(|| Error::BadResponse("Missing replicas count".to_string()))?
.parse()
.map_err(|err| {
Error::BadResponse(format!("Invalid replicas count: {err}"))
})?;
let entry = nmap.entry(ns.to_string()).or_default();
if entry.replicas != n_replicas
&& reigime
>= entry
.nodes
.iter()
.map(|(r, _)| *r)
.max()
.unwrap_or_default()
{
let wanted_size = n_replicas * node::PARTITIONS;
entry.nodes.resize_with(wanted_size, || (0, None));
entry.replicas = n_replicas;
}
for (section, replica) in
info_section.zip(entry.nodes.chunks_mut(node::PARTITIONS))
{
let restore_buffer = BASE64.decode(section)?;
for (idx, (this_reigimes, item)) in replica.iter_mut().enumerate() {
if restore_buffer[idx >> 3] & (0x80 >> (idx & 7) as u8) != 0
&& reigime >= *this_reigimes
{
*item = Some(node.clone());
*this_reigimes = reigime;
}
}
}
}
_ => {
return Err(Error::BadResponse(
"Error parsing partition info".to_string(),
))
}
}
}
Ok(())
}
}