dynamo_runtime/storage/kv/
nats.rs1use std::{collections::HashMap, pin::Pin, time::Duration};
5
6use crate::{protocols::EndpointId, slug::Slug, storage::kv, transports::nats::Client};
7use async_nats::jetstream::kv::Operation;
8use async_trait::async_trait;
9use futures::StreamExt;
10
11use super::{Bucket, Store, StoreError, StoreOutcome};
12
13#[derive(Clone)]
14pub struct NATSStore {
15 client: Client,
16 endpoint: EndpointId,
17}
18
19pub struct NATSBucket {
20 nats_store: async_nats::jetstream::kv::Store,
21}
22
23#[async_trait]
24impl Store for NATSStore {
25 type Bucket = NATSBucket;
26
27 async fn get_or_create_bucket(
28 &self,
29 bucket_name: &str,
30 ttl: Option<Duration>,
31 ) -> Result<Self::Bucket, StoreError> {
32 let name = Slug::slugify(bucket_name);
33 let nats_store = self
34 .get_or_create_key_value(&self.endpoint.namespace, &name, ttl)
35 .await?;
36 Ok(NATSBucket { nats_store })
37 }
38
39 async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
40 let name = Slug::slugify(bucket_name);
41 match self.get_key_value(&self.endpoint.namespace, &name).await? {
42 Some(nats_store) => Ok(Some(NATSBucket { nats_store })),
43 None => Ok(None),
44 }
45 }
46
47 fn connection_id(&self) -> u64 {
48 self.client.client().server_info().client_id
49 }
50
51 fn shutdown(&self) {
52 }
55}
56
57impl NATSStore {
58 pub fn new(client: Client, endpoint: EndpointId) -> Self {
59 NATSStore { client, endpoint }
60 }
61
62 async fn get_or_create_key_value(
67 &self,
68 namespace: &str,
69 bucket_name: &Slug,
70 ttl: Option<Duration>,
72 ) -> Result<async_nats::jetstream::kv::Store, StoreError> {
73 if let Ok(Some(kv)) = self.get_key_value(namespace, bucket_name).await {
74 return Ok(kv);
75 }
76
77 let bucket_name = single_name(namespace, bucket_name);
80 let js = self.client.jetstream();
81 let create_result = js
82 .create_key_value(
83 async_nats::jetstream::kv::Config {
85 bucket: bucket_name.clone(),
86 max_age: ttl.unwrap_or_default(),
87 ..Default::default()
88 },
89 )
90 .await;
91 let nats_store = create_result
92 .map_err(|err| StoreError::KeyValueError(err.to_string(), bucket_name.clone()))?;
93 tracing::debug!("Created bucket {bucket_name}");
94 Ok(nats_store)
95 }
96
97 async fn get_key_value(
98 &self,
99 namespace: &str,
100 bucket_name: &Slug,
101 ) -> Result<Option<async_nats::jetstream::kv::Store>, StoreError> {
102 let bucket_name = single_name(namespace, bucket_name);
103 let js = self.client.jetstream();
104
105 use async_nats::jetstream::context::KeyValueErrorKind;
106 match js.get_key_value(&bucket_name).await {
107 Ok(store) => Ok(Some(store)),
108 Err(err) if err.kind() == KeyValueErrorKind::GetBucket => {
109 Ok(None)
111 }
112 Err(err) => Err(StoreError::KeyValueError(err.to_string(), bucket_name)),
113 }
114 }
115}
116
117#[async_trait]
118impl Bucket for NATSBucket {
119 async fn insert(
120 &self,
121 key: &kv::Key,
122 value: bytes::Bytes,
123 revision: u64,
124 ) -> Result<StoreOutcome, StoreError> {
125 if revision == 0 {
126 self.create(key, value).await
127 } else {
128 self.update(key, value, revision).await
129 }
130 }
131
132 async fn get(&self, key: &kv::Key) -> Result<Option<bytes::Bytes>, StoreError> {
133 self.nats_store
134 .get(key)
135 .await
136 .map_err(|e| StoreError::NATSError(e.to_string()))
137 }
138
139 async fn delete(&self, key: &kv::Key) -> Result<(), StoreError> {
140 self.nats_store
141 .delete(key)
142 .await
143 .map_err(|e| StoreError::NATSError(e.to_string()))
144 }
145
146 async fn watch(
147 &self,
148 ) -> Result<Pin<Box<dyn futures::Stream<Item = kv::WatchEvent> + Send + 'life0>>, StoreError>
149 {
150 let watch_stream = self
151 .nats_store
152 .watch_all()
153 .await
154 .map_err(|e| StoreError::NATSError(e.to_string()))?;
155 Ok(Box::pin(
157 watch_stream.filter_map(
158 |maybe_entry: Result<
159 async_nats::jetstream::kv::Entry,
160 async_nats::error::Error<_>,
161 >| async move {
162 match maybe_entry {
163 Ok(entry) => {
164 let key = kv::Key::new(entry.key);
165 Some(match entry.operation {
166 Operation::Put => {
167 let item = kv::KeyValue::new(key, entry.value);
168 kv::WatchEvent::Put(item)
169 }
170 Operation::Delete => kv::WatchEvent::Delete(key),
171 Operation::Purge => kv::WatchEvent::Delete(key),
173 })
174 }
175 Err(e) => {
176 tracing::error!(error=%e, "watch fatal err");
177 None
178 }
179 }
180 },
181 ),
182 ))
183 }
184
185 async fn entries(&self) -> Result<HashMap<kv::Key, bytes::Bytes>, StoreError> {
186 let mut key_stream = self
187 .nats_store
188 .keys()
189 .await
190 .map_err(|e| StoreError::NATSError(e.to_string()))?;
191 let mut out = HashMap::new();
192 while let Some(Ok(key)) = key_stream.next().await {
193 if let Ok(Some(entry)) = self.nats_store.entry(&key).await {
194 out.insert(kv::Key::new(key), entry.value);
195 }
196 }
197 Ok(out)
198 }
199}
200
201impl NATSBucket {
202 async fn create(&self, key: &kv::Key, value: bytes::Bytes) -> Result<StoreOutcome, StoreError> {
203 match self.nats_store.create(&key, value).await {
204 Ok(revision) => Ok(StoreOutcome::Created(revision)),
205 Err(err) if err.kind() == async_nats::jetstream::kv::CreateErrorKind::AlreadyExists => {
206 match self.nats_store.entry(key).await {
208 Ok(Some(entry)) => Ok(StoreOutcome::Exists(entry.revision)),
209 Ok(None) => {
210 tracing::error!(
211 %key,
212 "Race condition, key deleted between create and fetch. Retry."
213 );
214 Err(StoreError::Retry)
215 }
216 Err(err) => Err(StoreError::NATSError(err.to_string())),
217 }
218 }
219 Err(err) => Err(StoreError::NATSError(err.to_string())),
220 }
221 }
222
223 async fn update(
224 &self,
225 key: &kv::Key,
226 value: bytes::Bytes,
227 revision: u64,
228 ) -> Result<StoreOutcome, StoreError> {
229 match self.nats_store.update(key, value.clone(), revision).await {
230 Ok(revision) => Ok(StoreOutcome::Created(revision)),
231 Err(err)
232 if err.kind() == async_nats::jetstream::kv::UpdateErrorKind::WrongLastRevision =>
233 {
234 tracing::warn!(revision, %key, "Update WrongLastRevision, resync");
235 self.resync_update(key, value).await
236 }
237 Err(err) => Err(StoreError::NATSError(err.to_string())),
238 }
239 }
240
241 async fn resync_update(
244 &self,
245 key: &kv::Key,
246 value: bytes::Bytes,
247 ) -> Result<StoreOutcome, StoreError> {
248 match self.nats_store.entry(key).await {
249 Ok(Some(entry)) => {
250 let next_rev = entry.revision + 1;
252 match self.nats_store.update(key, value, next_rev).await {
253 Ok(correct_revision) => Ok(StoreOutcome::Created(correct_revision)),
254 Err(err) => Err(StoreError::NATSError(format!(
255 "Error during update of key {key} after resync: {err}"
256 ))),
257 }
258 }
259 Ok(None) => {
260 tracing::warn!(%key, "Entry does not exist during resync, creating.");
261 self.create(key, value).await
262 }
263 Err(err) => {
264 tracing::error!(%key, %err, "Failed fetching entry during resync");
265 Err(StoreError::NATSError(err.to_string()))
266 }
267 }
268 }
269}
270
271fn single_name(namespace: &str, name: &Slug) -> String {
274 format!("{namespace}_{name}")
275}