pub use redis;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt, io,
iter::Iterator,
marker::Unpin,
mem,
pin::Pin,
sync::Arc,
task::{self, Poll},
time::Duration,
};
use crc16::*;
use futures::{
channel::{mpsc, oneshot},
future::{self, BoxFuture},
prelude::*,
ready, stream,
};
use log::trace;
use pin_project_lite::pin_project;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use redis::{
aio::ConnectionLike, Cmd, ConnectionAddr, ConnectionInfo, ErrorKind, IntoConnectionInfo,
RedisError, RedisFuture, RedisResult, Value,
};
const SLOT_SIZE: usize = 16384;
const DEFAULT_RETRIES: u32 = 16;
pub struct Client {
initial_nodes: Vec<ConnectionInfo>,
retries: Option<u32>,
}
impl Client {
pub fn open<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<Client> {
let mut nodes = Vec::with_capacity(initial_nodes.len());
for info in initial_nodes {
let info = info.into_connection_info()?;
if let ConnectionAddr::Unix(_) = *info.addr {
return Err(RedisError::from((ErrorKind::InvalidClientConfig,
"This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
}
nodes.push(info);
}
Ok(Client {
initial_nodes: nodes,
retries: Some(DEFAULT_RETRIES),
})
}
pub fn set_retries(&mut self, retries: Option<u32>) -> &mut Self {
self.retries = retries;
self
}
pub async fn get_connection(&self) -> RedisResult<Connection> {
Connection::new(&self.initial_nodes, self.retries).await
}
#[doc(hidden)]
pub async fn get_generic_connection<C>(&self) -> RedisResult<Connection<C>>
where
C: ConnectionLike + Connect + Clone + Send + Unpin + 'static,
{
Connection::new(&self.initial_nodes, self.retries).await
}
}
#[derive(Clone)]
pub struct Connection<C = redis::aio::MultiplexedConnection>(mpsc::Sender<Message<C>>);
impl<C> Connection<C>
where
C: ConnectionLike + Connect + Clone + Send + Unpin + 'static,
{
async fn new(
initial_nodes: &[ConnectionInfo],
retries: Option<u32>,
) -> RedisResult<Connection<C>> {
Pipeline::new(initial_nodes, retries)
.map_ok(|pipeline| {
let (tx, rx) = mpsc::channel::<Message<_>>(100);
tokio::spawn(rx.map(Ok).forward(pipeline).map(|_| ()));
Connection(tx)
})
.await
}
}
type SlotMap = BTreeMap<u16, String>;
struct Pipeline<C> {
connections: HashMap<String, C>,
slots: SlotMap,
state: ConnectionState<C>,
in_flight_requests:
Vec<Pin<Box<Request<BoxFuture<'static, (String, RedisResult<Response>)>, Response, C>>>>,
retries: Option<u32>,
}
#[derive(Clone)]
enum CmdArg<C> {
Cmd {
cmd: Arc<redis::Cmd>,
func: fn(C, Arc<redis::Cmd>) -> RedisFuture<'static, Response>,
},
Pipeline {
pipeline: Arc<redis::Pipeline>,
offset: usize,
count: usize,
func: fn(C, Arc<redis::Pipeline>, usize, usize) -> RedisFuture<'static, Response>,
},
}
impl<C> CmdArg<C> {
fn exec(&self, con: C) -> RedisFuture<'static, Response> {
match self {
Self::Cmd { cmd, func } => func(con, cmd.clone()),
Self::Pipeline {
pipeline,
offset,
count,
func,
} => func(con, pipeline.clone(), *offset, *count),
}
}
fn slot(&self) -> Option<u16> {
fn get_cmd_arg(cmd: &Cmd, arg_num: usize) -> Option<&[u8]> {
cmd.args_iter().nth(arg_num).and_then(|arg| match arg {
redis::Arg::Simple(arg) => Some(arg),
redis::Arg::Cursor => None,
})
}
fn slot_for_command(cmd: &Cmd) -> Option<u16> {
match get_cmd_arg(cmd, 0) {
Some(b"EVAL") | Some(b"EVALSHA") => {
get_cmd_arg(cmd, 2).and_then(|key_count_bytes| {
let key_count_res = std::str::from_utf8(key_count_bytes)
.ok()
.and_then(|key_count_str| key_count_str.parse::<usize>().ok());
key_count_res.and_then(|key_count| {
if key_count > 0 {
get_cmd_arg(cmd, 3).map(|key| slot_for_key(key))
} else {
None
}
})
})
}
Some(b"SCRIPT") => {
None
}
_ => get_cmd_arg(cmd, 1).map(|key| slot_for_key(key)),
}
}
match self {
Self::Cmd { cmd, .. } => slot_for_command(cmd),
Self::Pipeline { pipeline, .. } => {
let mut iter = pipeline.cmd_iter();
let slot = iter.next().map(slot_for_command)?;
for cmd in iter {
if slot != slot_for_command(cmd) {
return None;
}
}
slot
}
}
}
}
enum Response {
Single(Value),
Multiple(Vec<Value>),
}
struct Message<C> {
cmd: CmdArg<C>,
sender: oneshot::Sender<RedisResult<Response>>,
}
enum ConnectionState<C> {
PollComplete,
Recover(RedisFuture<'static, (SlotMap, HashMap<String, C>)>),
}
impl<C> fmt::Debug for ConnectionState<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
ConnectionState::PollComplete => "PollComplete",
ConnectionState::Recover(_) => "Recover",
}
)
}
}
struct RequestInfo<C> {
cmd: CmdArg<C>,
slot: Option<u16>,
excludes: HashSet<String>,
}
pin_project! {
#[project = RequestStateProj]
enum RequestState<F> {
None,
Future {
#[pin]
future: F,
},
Sleep {
#[pin]
sleep: tokio::time::Sleep,
},
}
}
pin_project! {
struct Request<F, I, C> {
retry: u32,
max_retries: Option<u32>,
sender: Option<oneshot::Sender<RedisResult<I>>>,
info: RequestInfo<C>,
#[pin]
future: RequestState<F>,
}
}
#[must_use]
enum Next {
TryNewConnection,
Done,
}
impl<F, I, C> Request<F, I, C>
where
F: Future<Output = (String, RedisResult<I>)>,
C: ConnectionLike,
{
fn poll_request(
mut self: Pin<&mut Self>,
cx: &mut task::Context,
connections_len: usize,
) -> Poll<Result<Next, RedisError>> {
let mut this = self.as_mut().project();
let future = match this.future.as_mut().project() {
RequestStateProj::Future { future } => future,
RequestStateProj::Sleep { sleep } => {
return Ok(match ready!(sleep.poll(cx)) {
() => Next::TryNewConnection,
})
.into();
}
_ => panic!("Request future must be Some"),
};
match ready!(future.poll(cx)) {
(_, Ok(item)) => {
trace!("Ok");
self.respond(Ok(item));
Ok(Next::Done).into()
}
(addr, Err(err)) => {
trace!("Request error {}", err);
match this.max_retries {
Some(max_retries) if this.retry == max_retries => {
self.respond(Err(err));
return Ok(Next::Done).into();
}
_ => (),
}
*this.retry = (*this.retry).saturating_add(1);
if let Some(error_code) = err.code() {
if error_code == "MOVED" || error_code == "ASK" {
this.info.excludes.clear();
return Err(err).into();
} else if error_code == "TRYAGAIN" || error_code == "CLUSTERDOWN" {
let sleep_duration =
Duration::from_millis(2u64.pow((*this.retry).max(7).min(16)) * 10);
this.info.excludes.clear();
this.future.set(RequestState::Sleep {
sleep: tokio::time::sleep(sleep_duration),
});
return self.poll_request(cx, connections_len);
}
}
this.info.excludes.insert(addr);
if this.info.excludes.len() >= connections_len {
self.respond(Err(err));
return Ok(Next::Done).into();
}
Ok(Next::TryNewConnection).into()
}
}
}
fn respond(self: Pin<&mut Self>, msg: RedisResult<I>) {
let _ = self
.project()
.sender
.take()
.expect("Result should only be sent once")
.send(msg);
}
}
impl<C> Pipeline<C>
where
C: ConnectionLike + Connect + Clone + Send + 'static,
{
async fn new(initial_nodes: &[ConnectionInfo], retries: Option<u32>) -> RedisResult<Self> {
let connections = Self::create_initial_connections(initial_nodes).await?;
let mut connection = Pipeline {
connections,
slots: Default::default(),
in_flight_requests: Vec::new(),
state: ConnectionState::PollComplete,
retries,
};
let (slots, connections) = connection.refresh_slots().await?;
connection.slots = slots;
connection.connections = connections;
Ok(connection)
}
async fn create_initial_connections(
initial_nodes: &[ConnectionInfo],
) -> RedisResult<HashMap<String, C>> {
stream::iter(initial_nodes)
.then(|info| {
let addr = match *info.addr {
ConnectionAddr::Tcp(ref host, port) => match &info.passwd {
Some(pw) => format!("redis://:{}@{}:{}", pw, host, port),
None => format!("redis://{}:{}", host, port),
},
_ => panic!("No reach."),
};
connect_and_check(info.clone()).map(|result| match result {
Ok(conn) => Some((addr, conn)),
Err(_) => None,
})
})
.fold(
HashMap::with_capacity(initial_nodes.len()),
|mut connections: HashMap<String, C>, conn: Option<(String, C)>| async move {
connections.extend(conn);
connections
},
)
.map(|connections| {
if connections.len() == 0 {
return Err(RedisError::from((
ErrorKind::IoError,
"Failed to create initial connections",
)));
}
Ok(connections)
})
.await
}
fn refresh_slots(
&mut self,
) -> impl Future<Output = RedisResult<(SlotMap, HashMap<String, C>)>> {
let mut connections = mem::replace(&mut self.connections, Default::default());
async move {
let mut result = Ok(SlotMap::new());
for (addr, conn) in connections.iter_mut() {
match get_slots(addr, &mut *conn)
.await
.and_then(|v| Self::build_slot_map(v))
{
Ok(s) => {
result = Ok(s);
break;
}
Err(err) => result = Err(err),
}
}
let slots = result?;
let new_connections = HashMap::with_capacity(connections.len());
let (_, connections) = stream::iter(slots.values())
.fold(
(connections, new_connections),
move |(mut connections, mut new_connections), addr| async move {
if !new_connections.contains_key(addr) {
let new_connection = if let Some(mut conn) = connections.remove(addr) {
match check_connection(&mut conn).await {
Ok(_) => Some((addr.to_string(), conn)),
Err(_) => match connect_and_check(addr.as_ref()).await {
Ok(conn) => Some((addr.to_string(), conn)),
Err(_) => None,
},
}
} else {
match connect_and_check(addr.as_ref()).await {
Ok(conn) => Some((addr.to_string(), conn)),
Err(_) => None,
}
};
new_connections.extend(new_connection);
}
(connections, new_connections)
},
)
.await;
Ok((slots, connections))
}
}
fn build_slot_map(mut slots_data: Vec<Slot>) -> RedisResult<SlotMap> {
slots_data.sort_by_key(|slot_data| slot_data.start);
let last_slot = slots_data.iter().try_fold(0, |prev_end, slot_data| {
if prev_end != slot_data.start() {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
format!(
"Received overlapping slots {} and {}..{}",
prev_end, slot_data.start, slot_data.end
),
)));
}
Ok(slot_data.end() + 1)
})?;
if usize::from(last_slot) != SLOT_SIZE {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
format!("Lacks the slots >= {}", last_slot),
)));
}
let slot_map = slots_data
.iter()
.map(|slot_data| (slot_data.end(), slot_data.master().to_string()))
.collect();
trace!("{:?}", slot_map);
Ok(slot_map)
}
fn get_connection(&self, slot: u16) -> impl Future<Output = (String, C)> + 'static {
if let Some((_, addr)) = self.slots.range(&slot..).next() {
if self.connections.contains_key(addr) {
return future::Either::Left(future::ready((
addr.clone(),
self.connections.get(addr).unwrap().clone(),
)));
}
let random_conn = get_random_connection(&self.connections, None);
let addr = addr.clone();
future::Either::Right(async move {
let result = connect_and_check(addr.as_ref()).await;
result
.map(|conn| (addr, conn))
.unwrap_or_else(|_| random_conn)
})
} else {
future::Either::Left(future::ready(get_random_connection(
&self.connections,
None,
)))
}
}
fn try_request(
&self,
info: &RequestInfo<C>,
) -> impl Future<Output = (String, RedisResult<Response>)> {
let cmd = info.cmd.clone();
(if info.excludes.len() > 0 || info.slot.is_none() {
future::Either::Left(future::ready(get_random_connection(
&self.connections,
Some(&info.excludes),
)))
} else {
future::Either::Right(self.get_connection(info.slot.unwrap()))
})
.then(move |(addr, conn)| cmd.exec(conn).map(|result| (addr, result)))
}
}
impl<C> Sink<Message<C>> for Pipeline<C>
where
C: ConnectionLike + Connect + Clone + Send + Unpin + 'static,
{
type Error = ();
fn poll_ready(self: Pin<&mut Self>, _cx: &mut task::Context) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn start_send(mut self: Pin<&mut Self>, msg: Message<C>) -> Result<(), Self::Error> {
trace!("start_send");
let cmd = msg.cmd;
let excludes = HashSet::new();
let slot = cmd.slot();
let info = RequestInfo {
cmd,
slot,
excludes,
};
let request = Request {
max_retries: self.retries,
retry: 0,
sender: Some(msg.sender),
future: RequestState::None,
info,
};
self.in_flight_requests.push(Box::pin(request));
Ok(()).into()
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Result<(), Self::Error>> {
trace!("poll_complete: {:?}", self.state);
loop {
self.state = match mem::replace(&mut self.state, ConnectionState::PollComplete) {
ConnectionState::Recover(mut future) => match future.as_mut().poll(cx) {
Poll::Ready(Ok((slots, connections))) => {
trace!("Recovered with {} connections!", connections.len());
self.slots = slots;
self.connections = connections;
ConnectionState::PollComplete
}
Poll::Pending => {
self.state = ConnectionState::Recover(future);
trace!("Recover not ready");
return Poll::Pending;
}
Poll::Ready(Err(_err)) => {
ConnectionState::Recover(Box::pin(self.refresh_slots()))
}
},
ConnectionState::PollComplete => {
let mut error = None;
let mut i = 0;
while i < self.in_flight_requests.len() {
if let RequestState::None = self.in_flight_requests[i].future {
let future = self.try_request(&self.in_flight_requests[i].info);
self.in_flight_requests[i].as_mut().project().future.set(
RequestState::Future {
future: Box::pin(future),
},
);
}
let self_ = &mut *self;
match self_.in_flight_requests[i]
.as_mut()
.poll_request(cx, self_.connections.len())
{
Poll::Pending => {
i += 1;
}
Poll::Ready(result) => match result {
Ok(next) => match next {
Next::Done => {
self.in_flight_requests.swap_remove(i);
}
Next::TryNewConnection => {
let mut request = self.in_flight_requests.swap_remove(i);
let mut r = request.as_mut().project();
r.future.set(RequestState::Future {
future: Box::pin(self.try_request(&r.info)),
});
self.in_flight_requests.push(request);
}
},
Err(err) => {
error = Some(err);
self.in_flight_requests[i]
.as_mut()
.project()
.future
.set(RequestState::None);
i += 1;
}
},
}
}
if let Some(err) = error {
trace!("Recovering {}", err);
ConnectionState::Recover(Box::pin(self.refresh_slots()))
} else if self.in_flight_requests.is_empty() {
return Ok(()).into();
} else {
return Poll::Pending;
}
}
}
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}
impl<C> ConnectionLike for Connection<C>
where
C: ConnectionLike + Send + 'static,
{
fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
trace!("req_packed_command");
let (sender, receiver) = oneshot::channel();
Box::pin(async move {
self.0
.send(Message {
cmd: CmdArg::Cmd {
cmd: Arc::new(cmd.clone()),
func: |mut conn, cmd| {
Box::pin(async move {
conn.req_packed_command(&cmd).map_ok(Response::Single).await
})
},
},
sender,
})
.map_err(|_| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
))
})
.await?;
receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
)))
})
.map(|response| match response {
Response::Single(value) => value,
Response::Multiple(_) => unreachable!(),
})
})
}
fn req_packed_commands<'a>(
&'a mut self,
pipeline: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> RedisFuture<'a, Vec<Value>> {
let (sender, receiver) = oneshot::channel();
Box::pin(async move {
self.0
.send(Message {
cmd: CmdArg::Pipeline {
pipeline: Arc::new(pipeline.clone()),
offset,
count,
func: |mut conn, pipeline, offset, count| {
Box::pin(async move {
conn.req_packed_commands(&pipeline, offset, count)
.map_ok(Response::Multiple)
.await
})
},
},
sender,
})
.map_err(|_| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
.await?;
receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
})
.map(|response| match response {
Response::Multiple(values) => values,
Response::Single(_) => unreachable!(),
})
})
}
fn get_db(&self) -> i64 {
0
}
}
impl Clone for Client {
fn clone(&self) -> Client {
Client::open(self.initial_nodes.clone()).unwrap()
}
}
trait ImplRedisFuture<T>: Future<Output = RedisResult<T>> {}
impl<T, F> ImplRedisFuture<T> for F where F: Future<Output = RedisResult<T>> {}
pub trait Connect: Sized {
fn connect<'a, T>(info: T) -> RedisFuture<'a, Self>
where
T: IntoConnectionInfo + Send + 'a;
}
impl Connect for redis::aio::MultiplexedConnection {
fn connect<'a, T>(info: T) -> RedisFuture<'a, redis::aio::MultiplexedConnection>
where
T: IntoConnectionInfo + Send + 'a,
{
async move {
let connection_info = info.into_connection_info()?;
let client = redis::Client::open(connection_info)?;
client.get_multiplexed_tokio_connection().await
}
.boxed()
}
}
fn connect_and_check<'a, T, C>(info: T) -> impl ImplRedisFuture<C> + 'a
where
T: IntoConnectionInfo + Send + 'a,
C: ConnectionLike + Connect + Send + 'static,
{
C::connect(info).and_then(|mut conn| async move {
check_connection(&mut conn).await?;
Ok(conn)
})
}
async fn check_connection<C>(conn: &mut C) -> RedisResult<()>
where
C: ConnectionLike + Send + 'static,
{
let mut cmd = Cmd::new();
cmd.arg("PING");
cmd.query_async::<_, String>(conn).await?;
Ok(())
}
fn get_random_connection<'a, C>(
connections: &'a HashMap<String, C>,
excludes: Option<&'a HashSet<String>>,
) -> (String, C)
where
C: Clone,
{
debug_assert!(!connections.is_empty());
let mut rng = thread_rng();
let sample = match excludes {
Some(excludes) if excludes.len() < connections.len() => {
let target_keys = connections.keys().filter(|key| !excludes.contains(*key));
target_keys.choose(&mut rng)
}
_ => connections.keys().choose(&mut rng),
};
let addr = sample.expect("No targets to choose from");
(addr.to_string(), connections.get(addr).unwrap().clone())
}
fn slot_for_key(key: &[u8]) -> u16 {
let key = sub_key(&key);
State::<XMODEM>::calculate(&key) % SLOT_SIZE as u16
}
fn sub_key(key: &[u8]) -> &[u8] {
key.iter()
.position(|b| *b == b'{')
.and_then(|open| {
let after_open = open + 1;
key[after_open..]
.iter()
.position(|b| *b == b'}')
.and_then(|close_offset| {
if close_offset != 0 {
Some(&key[after_open..after_open + close_offset])
} else {
None
}
})
})
.unwrap_or(key)
}
#[derive(Debug)]
struct Slot {
start: u16,
end: u16,
master: String,
replicas: Vec<String>,
}
impl Slot {
pub fn start(&self) -> u16 {
self.start
}
pub fn end(&self) -> u16 {
self.end
}
pub fn master(&self) -> &str {
&self.master
}
#[allow(dead_code)]
pub fn replicas(&self) -> &Vec<String> {
&self.replicas
}
}
async fn get_slots<C>(addr: &str, connection: &mut C) -> RedisResult<Vec<Slot>>
where
C: ConnectionLike,
{
trace!("get_slots");
let mut cmd = Cmd::new();
cmd.arg("CLUSTER").arg("SLOTS");
let value = connection
.req_packed_command(&cmd)
.map_err(|err| {
trace!("get_slots error: {}", err);
err
})
.await?;
trace!("get_slots -> {:#?}", value);
let mut result = Vec::with_capacity(2);
if let Value::Bulk(items) = value {
let password = get_password(addr);
let mut iter = items.into_iter();
while let Some(Value::Bulk(item)) = iter.next() {
if item.len() < 3 {
continue;
}
let start = if let Value::Int(start) = item[0] {
start as u16
} else {
continue;
};
let end = if let Value::Int(end) = item[1] {
end as u16
} else {
continue;
};
let mut nodes: Vec<String> = item
.into_iter()
.skip(2)
.filter_map(|node| {
if let Value::Bulk(node) = node {
if node.len() < 2 {
return None;
}
let ip = if let Value::Data(ref ip) = node[0] {
String::from_utf8_lossy(ip)
} else {
return None;
};
let port = if let Value::Int(port) = node[1] {
port
} else {
return None;
};
match &password {
Some(pw) => Some(format!("redis://:{}@{}:{}", pw, ip, port)),
None => Some(format!("redis://{}:{}", ip, port)),
}
} else {
None
}
})
.collect();
if nodes.len() < 1 {
continue;
}
let replicas = nodes.split_off(1);
result.push(Slot {
start,
end,
master: nodes.pop().unwrap(),
replicas,
});
}
}
Ok(result)
}
fn get_password(addr: &str) -> Option<String> {
redis::parse_redis_url(addr)
.ok()
.and_then(|url| url.password().map(|s| s.into()))
}
#[cfg(test)]
mod tests {
use super::*;
fn slot_for_packed_command(cmd: &[u8]) -> Option<u16> {
command_key(cmd).map(|key| {
let key = sub_key(&key);
State::<XMODEM>::calculate(&key) % SLOT_SIZE as u16
})
}
fn command_key(cmd: &[u8]) -> Option<Vec<u8>> {
redis::parse_redis_value(cmd)
.ok()
.and_then(|value| match value {
Value::Bulk(mut args) => {
if args.len() >= 2 {
match args.swap_remove(1) {
Value::Data(key) => Some(key),
_ => None,
}
} else {
None
}
}
_ => None,
})
}
#[test]
fn slot() {
assert_eq!(
slot_for_packed_command(&[
42, 50, 13, 10, 36, 54, 13, 10, 69, 88, 73, 83, 84, 83, 13, 10, 36, 49, 54, 13, 10,
244, 93, 23, 40, 126, 127, 253, 33, 89, 47, 185, 204, 171, 249, 96, 139, 13, 10
]),
Some(964)
);
assert_eq!(
slot_for_packed_command(&[
42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 36, 241,
197, 111, 180, 254, 5, 175, 143, 146, 171, 39, 172, 23, 164, 145, 13, 10, 36, 52,
13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
]),
Some(8352)
);
assert_eq!(
slot_for_packed_command(&[
42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 169, 233,
247, 59, 50, 247, 100, 232, 123, 140, 2, 101, 125, 221, 66, 170, 13, 10, 36, 52,
13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
]),
Some(5210),
);
}
}