1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::convert::{TryFrom, TryInto};
use darkredis::{Command, ConnectionPool, Value};
use async_trait::async_trait;
use crate::{Error, Leaf, LeafDao, Result};
#[derive(Debug)]
pub struct RedisDao {
pool: darkredis::ConnectionPool,
}
impl TryFrom<Value> for Leaf {
type Error = Error;
fn try_from(value: Value) -> Result<Self> {
fn redis_value_to_isize(v: Value) -> Option<isize> {
match v {
Value::String(s) => String::from_utf8(s).ok()?.parse::<isize>().ok(),
Value::Integer(i) => Some(i),
_ => None,
}
}
let (tag, max_id, step) = value
.optional_array()
.and_then(|values| {
let mut values = values.into_iter();
let (tag, max_id, step) = (values.next()?, values.next()?, values.next()?);
Some((
redis_value_to_isize(tag)? as i32,
redis_value_to_isize(max_id)? as i64,
redis_value_to_isize(step)? as i32,
))
})
.ok_or(Error::SerializationError)?;
Ok(Self { tag, max_id, step })
}
}
#[async_trait]
impl LeafDao for RedisDao {
async fn leaves(&self) -> Result<Vec<Leaf>> {
let mut leaves = vec![];
for tag in self.tags().await? {
if let Ok(leaf) = self.leaf(tag).await {
leaves.push(leaf);
}
}
Ok(leaves)
}
async fn leaf(&self, tag: i32) -> Result<Leaf> {
let mut conn = self.pool.get().await;
let key = format!("leaf_alloc:{}", tag);
let command = Command::new("HMGET")
.arg(&key)
.arg(b"tag")
.arg(b"max_id")
.arg(b"step");
conn.run_command(command).await?.try_into()
}
async fn insert(&self, leaf: Leaf) -> Result<()> {
let mut conn = self.pool.get().await;
let key = format!("leaf_alloc:{}", leaf.tag).into_bytes();
let tag_bytes = leaf.tag.to_string().into_bytes();
let max_id_bytes = leaf.max_id.to_string().into_bytes();
let step_bytes = leaf.step.to_string().into_bytes();
let command = Command::new("HMSET")
.arg(&key)
.arg(b"tag")
.arg(&tag_bytes)
.arg(b"max_id")
.arg(&max_id_bytes)
.arg(b"step")
.arg(&step_bytes);
conn.run_command(command).await?;
Ok(())
}
async fn tags(&self) -> Result<Vec<i32>> {
let mut conn = self.pool.get().await;
let command = Command::new("KEYS").arg(b"leaf_alloc:*");
Ok(conn
.run_command(command)
.await?
.optional_array()
.map(|tags| {
tags.into_iter()
.map(|v| {
v.optional_string()
.and_then(|v| {
String::from_utf8(v.rsplit(|i| i.eq(&b':')).next()?.into()).ok()
})
.and_then(|s| s.parse::<i32>().ok())
})
.flatten()
.collect()
})
.unwrap_or(vec![]))
}
async fn update_max(&self, tag: i32) -> Result<Leaf> {
let step = self.leaf(tag).await.map(|l| l.step).unwrap_or(1000);
self.update_max_by_step(tag, step).await
}
async fn update_max_by_step(&self, tag: i32, step: i32) -> Result<Leaf> {
let mut conn = self.pool.get().await;
conn.hincrby(format!("leaf_alloc:{}", tag), b"max_id", step as isize)
.await?;
self.leaf(tag).await
}
}
impl RedisDao {
pub async fn new(address: impl Into<String>, password: Option<&str>) -> Result<Self> {
Ok(Self {
pool: ConnectionPool::create(address.into(), password, num_cpus::get()).await?,
})
}
}