use std::str;
use std::sync::Arc;
use aerospike_rt::Mutex;
use crate::cluster::Node;
use crate::commands::{Command, SingleCommand, StreamCommand};
use crate::errors::Result;
use crate::net::Connection;
use crate::policy::QueryPolicy;
use crate::query::NodePartitions;
use crate::{Bins, Recordset};
pub struct ScanCommand<'a> {
stream_command: StreamCommand,
policy: &'a QueryPolicy,
namespace: &'a str,
set_name: &'a str,
bins: Bins,
}
impl<'a> ScanCommand<'a> {
pub async fn new(
policy: &'a QueryPolicy,
namespace: &'a str,
set_name: &'a str,
bins: Bins,
recordset: Arc<Recordset>,
node_partitions: Arc<Mutex<NodePartitions>>,
) -> Self {
let node = {
let node_partitions = node_partitions.lock().await;
node_partitions.node.clone()
};
ScanCommand {
stream_command: StreamCommand::new(node, recordset, node_partitions, true),
policy,
namespace,
set_name,
bins,
}
}
pub async fn execute(&mut self) -> Result<()> {
SingleCommand::execute(self.policy, self).await
}
}
#[async_trait::async_trait]
impl Command for ScanCommand<'_> {
async fn write_timeout(&mut self, conn: &mut Connection) -> Result<()> {
let server_timeout = self
.stream_command
.recordset
.tracker
.lock()
.await
.server_timeout();
conn.buffer.write_timeout(server_timeout);
Ok(())
}
async fn write_buffer(&mut self, conn: &mut Connection) -> Result<()> {
conn.flush().await
}
async fn prepare_buffer(&mut self, conn: &mut Connection) -> Result<()> {
let node_partitions = self.stream_command.node_partitions.lock().await;
conn.buffer
.set_scan(
self.policy,
self.namespace,
self.set_name,
&self.bins,
self.stream_command.recordset.task_id(),
&node_partitions,
)
.await
}
async fn get_node(&mut self) -> Result<Arc<Node>> {
self.stream_command.get_node().await
}
fn hint(&self) -> u8 {
0
}
fn can_retry(&mut self) -> bool {
false
}
fn can_recover_connection(&mut self) -> bool {
false
}
async fn parse_result(&mut self, conn: &mut Connection) -> Result<()> {
StreamCommand::parse_result(&mut self.stream_command, conn).await
}
}