Skip to main content

dynamo_runtime/storage/kv/
nats.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::HashMap, pin::Pin, time::Duration};
5
6use crate::{protocols::EndpointId, slug::Slug, storage::kv, transports::nats::Client};
7use async_nats::jetstream::kv::Operation;
8use async_trait::async_trait;
9use futures::StreamExt;
10
11use super::{Bucket, Store, StoreError, StoreOutcome};
12
13#[derive(Clone)]
14pub struct NATSStore {
15    client: Client,
16    endpoint: EndpointId,
17}
18
19pub struct NATSBucket {
20    nats_store: async_nats::jetstream::kv::Store,
21}
22
23#[async_trait]
24impl Store for NATSStore {
25    type Bucket = NATSBucket;
26
27    async fn get_or_create_bucket(
28        &self,
29        bucket_name: &str,
30        ttl: Option<Duration>,
31    ) -> Result<Self::Bucket, StoreError> {
32        let name = Slug::slugify(bucket_name);
33        let nats_store = self
34            .get_or_create_key_value(&self.endpoint.namespace, &name, ttl)
35            .await?;
36        Ok(NATSBucket { nats_store })
37    }
38
39    async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
40        let name = Slug::slugify(bucket_name);
41        match self.get_key_value(&self.endpoint.namespace, &name).await? {
42            Some(nats_store) => Ok(Some(NATSBucket { nats_store })),
43            None => Ok(None),
44        }
45    }
46
47    fn connection_id(&self) -> u64 {
48        self.client.client().server_info().client_id
49    }
50
51    fn shutdown(&self) {
52        // TODO: Track and delete any owned keys
53        // The TTL should ensure NATS does it, but best we do it immediately
54    }
55}
56
57impl NATSStore {
58    pub fn new(client: Client, endpoint: EndpointId) -> Self {
59        NATSStore { client, endpoint }
60    }
61
62    /// Get or create a key-value store (aka bucket) in NATS.
63    ///
64    /// ttl is only used if we are creating the bucket, so if that has
65    /// changed first delete the bucket.
66    async fn get_or_create_key_value(
67        &self,
68        namespace: &str,
69        bucket_name: &Slug,
70        // Delete entries older than this
71        ttl: Option<Duration>,
72    ) -> Result<async_nats::jetstream::kv::Store, StoreError> {
73        if let Ok(Some(kv)) = self.get_key_value(namespace, bucket_name).await {
74            return Ok(kv);
75        }
76
77        // It doesn't exist, create it
78
79        let bucket_name = single_name(namespace, bucket_name);
80        let js = self.client.jetstream();
81        let create_result = js
82            .create_key_value(
83                // TODO: configure the bucket, probably need to pass some of these values in
84                async_nats::jetstream::kv::Config {
85                    bucket: bucket_name.clone(),
86                    max_age: ttl.unwrap_or_default(),
87                    ..Default::default()
88                },
89            )
90            .await;
91        let nats_store = create_result
92            .map_err(|err| StoreError::KeyValueError(err.to_string(), bucket_name.clone()))?;
93        tracing::debug!("Created bucket {bucket_name}");
94        Ok(nats_store)
95    }
96
97    async fn get_key_value(
98        &self,
99        namespace: &str,
100        bucket_name: &Slug,
101    ) -> Result<Option<async_nats::jetstream::kv::Store>, StoreError> {
102        let bucket_name = single_name(namespace, bucket_name);
103        let js = self.client.jetstream();
104
105        use async_nats::jetstream::context::KeyValueErrorKind;
106        match js.get_key_value(&bucket_name).await {
107            Ok(store) => Ok(Some(store)),
108            Err(err) if err.kind() == KeyValueErrorKind::GetBucket => {
109                // bucket doesn't exist
110                Ok(None)
111            }
112            Err(err) => Err(StoreError::KeyValueError(err.to_string(), bucket_name)),
113        }
114    }
115}
116
117#[async_trait]
118impl Bucket for NATSBucket {
119    async fn insert(
120        &self,
121        key: &kv::Key,
122        value: bytes::Bytes,
123        revision: u64,
124    ) -> Result<StoreOutcome, StoreError> {
125        if revision == 0 {
126            self.create(key, value).await
127        } else {
128            self.update(key, value, revision).await
129        }
130    }
131
132    async fn get(&self, key: &kv::Key) -> Result<Option<bytes::Bytes>, StoreError> {
133        self.nats_store
134            .get(key)
135            .await
136            .map_err(|e| StoreError::NATSError(e.to_string()))
137    }
138
139    async fn delete(&self, key: &kv::Key) -> Result<(), StoreError> {
140        self.nats_store
141            .delete(key)
142            .await
143            .map_err(|e| StoreError::NATSError(e.to_string()))
144    }
145
146    async fn watch(
147        &self,
148    ) -> Result<Pin<Box<dyn futures::Stream<Item = kv::WatchEvent> + Send + 'life0>>, StoreError>
149    {
150        let watch_stream = self
151            .nats_store
152            .watch_all()
153            .await
154            .map_err(|e| StoreError::NATSError(e.to_string()))?;
155        // Map the `Entry` to `Entry.value` which is Bytes of the stored value.
156        Ok(Box::pin(
157            watch_stream.filter_map(
158                |maybe_entry: Result<
159                    async_nats::jetstream::kv::Entry,
160                    async_nats::error::Error<_>,
161                >| async move {
162                    match maybe_entry {
163                        Ok(entry) => {
164                            let key = kv::Key::new(entry.key);
165                            Some(match entry.operation {
166                                Operation::Put => {
167                                    let item = kv::KeyValue::new(key, entry.value);
168                                    kv::WatchEvent::Put(item)
169                                }
170                                Operation::Delete => kv::WatchEvent::Delete(key),
171                                // TODO: What is Purge? Not urgent, NATS impl not used
172                                Operation::Purge => kv::WatchEvent::Delete(key),
173                            })
174                        }
175                        Err(e) => {
176                            tracing::error!(error=%e, "watch fatal err");
177                            None
178                        }
179                    }
180                },
181            ),
182        ))
183    }
184
185    async fn entries(&self) -> Result<HashMap<kv::Key, bytes::Bytes>, StoreError> {
186        let mut key_stream = self
187            .nats_store
188            .keys()
189            .await
190            .map_err(|e| StoreError::NATSError(e.to_string()))?;
191        let mut out = HashMap::new();
192        while let Some(Ok(key)) = key_stream.next().await {
193            if let Ok(Some(entry)) = self.nats_store.entry(&key).await {
194                out.insert(kv::Key::new(key), entry.value);
195            }
196        }
197        Ok(out)
198    }
199}
200
201impl NATSBucket {
202    async fn create(&self, key: &kv::Key, value: bytes::Bytes) -> Result<StoreOutcome, StoreError> {
203        match self.nats_store.create(&key, value).await {
204            Ok(revision) => Ok(StoreOutcome::Created(revision)),
205            Err(err) if err.kind() == async_nats::jetstream::kv::CreateErrorKind::AlreadyExists => {
206                // key exists, get the revsion
207                match self.nats_store.entry(key).await {
208                    Ok(Some(entry)) => Ok(StoreOutcome::Exists(entry.revision)),
209                    Ok(None) => {
210                        tracing::error!(
211                            %key,
212                            "Race condition, key deleted between create and fetch. Retry."
213                        );
214                        Err(StoreError::Retry)
215                    }
216                    Err(err) => Err(StoreError::NATSError(err.to_string())),
217                }
218            }
219            Err(err) => Err(StoreError::NATSError(err.to_string())),
220        }
221    }
222
223    async fn update(
224        &self,
225        key: &kv::Key,
226        value: bytes::Bytes,
227        revision: u64,
228    ) -> Result<StoreOutcome, StoreError> {
229        match self.nats_store.update(key, value.clone(), revision).await {
230            Ok(revision) => Ok(StoreOutcome::Created(revision)),
231            Err(err)
232                if err.kind() == async_nats::jetstream::kv::UpdateErrorKind::WrongLastRevision =>
233            {
234                tracing::warn!(revision, %key, "Update WrongLastRevision, resync");
235                self.resync_update(key, value).await
236            }
237            Err(err) => Err(StoreError::NATSError(err.to_string())),
238        }
239    }
240
241    /// We have the wrong revision for a key. Fetch it's entry to get the correct revision,
242    /// and try the update again.
243    async fn resync_update(
244        &self,
245        key: &kv::Key,
246        value: bytes::Bytes,
247    ) -> Result<StoreOutcome, StoreError> {
248        match self.nats_store.entry(key).await {
249            Ok(Some(entry)) => {
250                // Re-try the update with new version number
251                let next_rev = entry.revision + 1;
252                match self.nats_store.update(key, value, next_rev).await {
253                    Ok(correct_revision) => Ok(StoreOutcome::Created(correct_revision)),
254                    Err(err) => Err(StoreError::NATSError(format!(
255                        "Error during update of key {key} after resync: {err}"
256                    ))),
257                }
258            }
259            Ok(None) => {
260                tracing::warn!(%key, "Entry does not exist during resync, creating.");
261                self.create(key, value).await
262            }
263            Err(err) => {
264                tracing::error!(%key, %err, "Failed fetching entry during resync");
265                Err(StoreError::NATSError(err.to_string()))
266            }
267        }
268    }
269}
270
271/// async-nats won't let us use a multi-part subject to create KV buckets (and probably many other
272/// things).
273fn single_name(namespace: &str, name: &Slug) -> String {
274    format!("{namespace}_{name}")
275}