atomic_env_rs/
lib.rs

1use std::{collections::HashMap, sync::Arc};
2
3type Key = Box<[u8]>;
4type Value = Box<[u8]>;
5
6pub struct Env {
7    inner: Arc<EnvInner>,
8}
9
10struct EnvInner {
11    entires: dashmap::DashMap<Key, Value, fxhash::FxBuildHasher>,
12}
13
14#[derive(Clone)]
15pub struct QueryRunner {
16    env: Arc<EnvInner>,
17}
18
19pub struct UpdateRunner {
20    env: Arc<EnvInner>,
21}
22
23pub struct Ctx {
24    env: Arc<EnvInner>,
25    batch: HashMap<Key, Operation, fxhash::FxBuildHasher>,
26}
27
28enum Operation {
29    Remove,
30    Insert(Value),
31}
32
33impl Env {
34    /// Create a new environment.
35    pub fn new() -> Self {
36        Env {
37            inner: Arc::new(EnvInner {
38                entires: Default::default(),
39            }),
40        }
41    }
42
43    /// Split the environment to the two query runner object and the update runner to
44    /// run queries and updates.
45    pub fn split(&mut self) -> (QueryRunner, UpdateRunner) {
46        (
47            QueryRunner::new(self.inner.clone()),
48            UpdateRunner::new(self.inner.clone()),
49        )
50    }
51}
52
53impl QueryRunner {
54    pub(crate) fn new(inner: Arc<EnvInner>) -> Self {
55        Self { env: inner }
56    }
57
58    /// Run a query on the latest state, the context passed to the closure is not going to contain
59    /// any intermediary changes happening in other threads.
60    pub fn run<F, T>(&self, job: F) -> T
61    where
62        F: Fn(&mut Ctx) -> T,
63    {
64        let mut ctx = Ctx::new(self.env.clone());
65        job(&mut ctx)
66    }
67}
68
69impl UpdateRunner {
70    pub(crate) fn new(inner: Arc<EnvInner>) -> Self {
71        Self { env: inner }
72    }
73
74    /// Run an update on the latest state. There can only be one update running on an environment.
75    pub fn run<F, T>(&mut self, job: F) -> T
76    where
77        F: Fn(&mut Ctx) -> T,
78    {
79        let mut ctx = Ctx::new(self.env.clone());
80        let result = job(&mut ctx);
81
82        // commit the batch.
83        // TOOD: Skip the commit if the execution failed.
84        self.commit(ctx.batch);
85
86        result
87    }
88
89    fn commit(&mut self, batch: HashMap<Key, Operation, fxhash::FxBuildHasher>) {
90        for (key, operation) in batch {
91            match operation {
92                Operation::Remove => {
93                    self.env.entires.remove(&key);
94                }
95                Operation::Insert(value) => {
96                    self.env.entires.insert(key, value);
97                }
98            }
99        }
100    }
101}
102
103impl Ctx {
104    pub(crate) fn new(inner: Arc<EnvInner>) -> Self {
105        Self {
106            env: inner,
107            batch: Default::default(),
108        }
109    }
110
111    pub fn get(&self, key: &[u8]) -> Option<Value> {
112        match self.batch.get(key) {
113            Some(Operation::Remove) => None,
114            Some(Operation::Insert(value)) => Some(value.clone()),
115            None => self.env.entires.get(key).map(|c| c.clone()),
116        }
117    }
118
119    pub fn set(&mut self, key: Key, value: Value) {
120        self.batch.insert(key, Operation::Insert(value));
121    }
122}
123
124// --
125
126pub fn mint(ctx: &mut Ctx, account: [u8; 32], amount: u128) {
127    let current_balance = balance(ctx, account);
128    let new_balance = current_balance + amount;
129    let new_balance_le_bytes = new_balance.to_le_bytes().to_vec().into_boxed_slice();
130    ctx.set(account.into(), new_balance_le_bytes);
131}
132
133pub fn balance(ctx: &mut Ctx, account: [u8; 32]) -> u128 {
134    ctx.get(&account)
135        .map(|v| {
136            let slice = arrayref::array_ref![v, 0, 16];
137            u128::from_le_bytes(*slice)
138        })
139        .unwrap_or(0)
140}
141
142pub fn transfer(ctx: &mut Ctx, from: [u8; 32], to: [u8; 32], amount: u128) {
143    // Transfer some funds from the `from` account into the destination account
144}
145
146#[test]
147fn demo() {
148    let mut env = Env::new();
149    let (query, mut update) = env.split();
150
151    let b = query.run(|ctx| balance(ctx, [0; 32]));
152    println!("Current balance = {b}");
153
154    query.run(|ctx| mint(ctx, [0; 32], 1000));
155    println!("Called mint as a query.");
156
157    let b = query.run(|ctx| balance(ctx, [0; 32]));
158    println!("Current balance = {b}");
159
160    let q = query.clone();
161    let (sender, recv) = std::sync::mpsc::channel::<()>();
162    let handle = std::thread::spawn(move || {
163        println!("running query...");
164
165        sender.send(()).unwrap();
166
167        q.run(|ctx| {
168            dbg!(balance(ctx, [0; 32]));
169
170            std::thread::sleep(std::time::Duration::new(2, 0));
171            println!("woke up!");
172            let b = balance(ctx, [0; 32]);
173            println!("balance = {b}");
174        });
175    });
176
177    recv.recv().unwrap();
178
179    update.run(|ctx| mint(ctx, [0; 32], 1000));
180    println!("Called mint as a update.");
181
182    let b = query.run(|ctx| balance(ctx, [0; 32]));
183    println!("Current balance = {b}");
184
185    handle.join().unwrap();
186}