use std::sync::Arc;
use std::time::Duration;
use crate::cluster::Node;
use crate::commands::{Command, SingleCommand, StreamCommand};
use crate::errors::Result;
use crate::net::Connection;
use crate::policy::QueryPolicy;
use crate::{Recordset, Statement};
pub struct QueryCommand<'a> {
stream_command: StreamCommand,
policy: &'a QueryPolicy,
statement: Arc<Statement>,
partitions: Vec<u16>,
}
impl<'a> QueryCommand<'a> {
pub fn new(
policy: &'a QueryPolicy,
node: Arc<Node>,
statement: Arc<Statement>,
recordset: Arc<Recordset>,
partitions: Vec<u16>,
) -> Self {
QueryCommand {
stream_command: StreamCommand::new(node, recordset),
policy,
statement,
partitions,
}
}
pub fn execute(&mut self) -> Result<()> {
SingleCommand::execute(self.policy, self)
}
}
impl<'a> Command for QueryCommand<'a> {
fn write_timeout(&mut self, conn: &mut Connection, timeout: Option<Duration>) -> Result<()> {
conn.buffer.write_timeout(timeout);
Ok(())
}
fn write_buffer(&mut self, conn: &mut Connection) -> Result<()> {
conn.flush()
}
fn prepare_buffer(&mut self, conn: &mut Connection) -> Result<()> {
conn.buffer.set_query(
self.policy,
&self.statement,
false,
self.stream_command.recordset.task_id(),
&self.partitions,
)
}
fn get_node(&self) -> Result<Arc<Node>> {
self.stream_command.get_node()
}
fn parse_result(&mut self, conn: &mut Connection) -> Result<()> {
StreamCommand::parse_result(&mut self.stream_command, conn)
}
}