use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::sync::Arc;
use crate::cluster::{Cluster, Node};
use crate::commands::{Command, SingleCommand};
use crate::errors::{Error, Result};
use crate::net::Connection;
use crate::policy::{BasePolicy, Policy, Replica};
use crate::value::bytes_to_particle;
use crate::{Bins, Key, Record, ResultCode, Value};
pub struct ReadCommand<'a> {
pub single_command: SingleCommand<'a>,
pub record: Option<Record>,
policy: &'a BasePolicy,
bins: Bins,
}
impl<'a> ReadCommand<'a> {
pub fn new(
policy: &'a BasePolicy,
cluster: Arc<Cluster>,
key: &'a Key,
bins: Bins,
replica: Replica,
) -> Self {
ReadCommand {
single_command: SingleCommand::new(cluster, key, replica),
bins,
policy,
record: None,
}
}
pub async fn execute(&mut self) -> Result<()> {
SingleCommand::execute(self.policy, self).await
}
fn parse_record(
&self,
conn: &mut Connection,
op_count: usize,
field_count: usize,
generation: u32,
expiration: u32,
) -> Result<Record> {
let mut bins: HashMap<String, Value> = HashMap::with_capacity(op_count);
for _ in 0..field_count {
let field_size = conn.buffer.read_u32(None) as usize;
conn.buffer.skip(4 + field_size);
}
for _ in 0..op_count {
let op_size = conn.buffer.read_u32(None) as usize;
conn.buffer.skip(1);
let particle_type = conn.buffer.read_u8(None);
conn.buffer.skip(1);
let name_size = conn.buffer.read_u8(None) as usize;
let name: String = conn.buffer.read_str(name_size)?;
let particle_bytes_size = op_size - (4 + name_size);
let value = bytes_to_particle(particle_type, &mut conn.buffer, particle_bytes_size)?;
if !value.is_nil() {
match bins.entry(name) {
Vacant(entry) => {
entry.insert(value);
}
Occupied(entry) => match *entry.into_mut() {
Value::MultiResult(ref mut list) => list.push(value),
ref mut prev => {
*prev = Value::MultiResult(vec![prev.clone(), value]);
}
},
}
}
}
Ok(Record::new(None, bins, generation, expiration))
}
}
#[async_trait::async_trait]
impl Command for ReadCommand<'_> {
async fn write_timeout(&mut self, conn: &mut Connection) -> Result<()> {
conn.buffer.write_timeout(self.policy.server_timeout());
Ok(())
}
async fn write_buffer(&mut self, conn: &mut Connection) -> Result<()> {
conn.flush().await
}
async fn prepare_buffer(&mut self, conn: &mut Connection) -> Result<()> {
conn.buffer
.set_read(self.policy, self.single_command.key, &self.bins)
}
async fn get_node(&mut self) -> Result<Arc<Node>> {
self.single_command.get_node()
}
fn hint(&self) -> u8 {
self.single_command.hint()
}
fn can_retry(&mut self) -> bool {
true
}
fn can_recover_connection(&mut self) -> bool {
true
}
async fn parse_result(&mut self, conn: &mut Connection) -> Result<()> {
if let Err(err) = conn.read_header().await {
warn!("Parse result error: {err}");
return Err(err);
}
conn.buffer.reset_offset();
let sz = conn.buffer.read_u64(Some(0));
let header_length = conn.buffer.read_u8(Some(8));
let result_code = conn.buffer.read_u8(Some(13));
let generation = conn.buffer.read_u32(Some(14));
let expiration = conn.buffer.read_u32(Some(18));
let field_count = conn.buffer.read_u16(Some(26)) as usize; let op_count = conn.buffer.read_u16(Some(28)) as usize;
let receive_size = ((sz & 0xFFFF_FFFF_FFFF) - u64::from(header_length)) as usize;
if receive_size > 0 {
if let Err(err) = conn.read_body(receive_size).await {
warn!("Parse result error: {err}");
return Err(err);
}
}
match ResultCode::from(result_code) {
ResultCode::Ok => {
let record = if self.bins.is_none() {
Record::new(None, HashMap::new(), generation, expiration)
} else {
self.parse_record(conn, op_count, field_count, generation, expiration)?
};
self.record = Some(record);
Ok(())
}
ResultCode::UdfBadResponse => {
let record =
self.parse_record(conn, op_count, field_count, generation, expiration)?;
let reason = record
.bins
.get("FAILURE")
.map_or_else(|| String::from("UDF Error"), ToString::to_string);
Err(Error::UdfBadResponse(reason))
}
rc => Err(Error::ServerError(rc, false, conn.addr.clone())),
}
}
}