1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
use std::collections::HashMap;
use std::ops::Deref;

use async_nats::{jetstream::kv::Store, Client};
use futures::{TryFutureExt, TryStreamExt};
use serde::de::DeserializeOwned;
use tracing::{debug, error};

use super::{
    delete_link, ld_hash_raw, put_link, KvStore, CLAIMS_PREFIX, LINKDEF_PREFIX, SUBJECT_KEY,
};
use crate::{types::LinkDefinition, Result};

#[derive(Clone, Debug)]
pub struct DirectKvStore {
    store: Store,
}

impl AsRef<Store> for DirectKvStore {
    fn as_ref(&self) -> &Store {
        &self.store
    }
}

impl Deref for DirectKvStore {
    type Target = Store;

    fn deref(&self) -> &Self::Target {
        &self.store
    }
}

impl DirectKvStore {
    pub async fn new(nc: Client, lattice_prefix: &str, js_domain: Option<String>) -> Result<Self> {
        super::get_kv_store(nc, lattice_prefix, js_domain)
            .await
            .map(|store| Self { store })
    }

    async fn fetch_data<T: DeserializeOwned>(&self, filter_prefix: &str) -> Result<Vec<T>> {
        // NOTE(thomastaylor312): This is a fairly gnarly mapping thing, but we're trying to avoid
        // iterating over all the keys multiple times. I have heavily annotated the code below to
        // try and make it more clear what's going on.

        // First thing here is a big block that lists all keys in the store. That list is a `Stream`
        // that returns `Result`s, so we have to use the `TryStreamExt` methods here to iterate over
        // each key.
        let futs = self
            .store
            .keys()
            // Why is there a `?` here you might ask? Because `keys` might _also_ return an error
            // when constructing the stream. The `Ok` result here contains the actual stream we're
            // going to use
            .await?
            // Only process keys that have the right prefix. We use `futures::future::ready` here
            // because it gets around data ownership issues, but we still need to return a future
            .try_filter(|key| futures::future::ready(key.starts_with(filter_prefix)))
            // For the remaining keys, we want to map _all_ `Ok` responses to a future that will
            // fetch the data from the key
            .map_ok(|k| {
                self.store
                    .get(k)
                    // If we get an ok response from the `get` call, we want to map that to the
                    // serialized data
                    .map_ok(|maybe_bytes| {
                        maybe_bytes.map(|bytes| serde_json::from_slice::<T>(&bytes))
                    })
                    // Explicitly annotate the error conversion here because it is hard to do it
                    // later on and this is complex enough the compiler can't infer
                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
            })
            // Read through the whole stream of keys and collect all the constructed futures into a vec
            .try_collect::<Vec<_>>()
            // We collect to an error here so if there was an error when streaming the keys, we can return early
            .await?;

        // Now that we have a vec of futures, we can use `join_all` to run them all concurrently.
        let data: Vec<T> = futures::future::join_all(futs)
            .await
            .into_iter()
            // Because `store.get` returns an `Option` if the key doesn't exist (i.e. the key was
            // deleted in between reading and fetching), we transpose the `Result<Option<_>>` to
            // `Option<Result<_>>` and let the filter_map handle the `None` case
            .filter_map(|res| res.transpose())
            // We have to collect once here to get the outer Result, which would be if any of the
            // `store.get` calls failed, which would be an error. Unfortunately that means we need
            // to collect to a Vec and then re-iterate over it, but it was the only way to handle
            // the error
            .collect::<std::result::Result<Vec<_>, _>>()?
            .into_iter()
            // We don't actually care if the data is malformed, but we do need to log the error so
            // that it can be fixed. This is probably the best trade off here as a failure that we
            // return here would _definitely_ be noticeable, but we don't want things screeching to
            // a halt because of a single (or even a few) malformed entries
            .filter_map(|res| match res {
                Ok(v) => Some(v),
                Err(e) => {
                    error!(error = %e, "failed to deserialize data, skipping entry");
                    None
                }
            })
            // Tha...tha...tha...that's all folks!
            .collect();
        Ok(data)
    }

    async fn fetch_single<T: DeserializeOwned>(&self, key: String) -> Result<Option<T>> {
        self.store
            .get(key)
            .await?
            .map(|bytes| serde_json::from_slice::<T>(&bytes))
            .transpose()
            .map_err(|e| e.into())
    }

    async fn filter_claims(&self, key_prefix: char) -> Result<Vec<HashMap<String, String>>> {
        Ok(self
            .get_all_claims()
            .await?
            .into_iter()
            .filter_map(|claims| match claims.get(SUBJECT_KEY) {
                Some(subject) if subject.starts_with(key_prefix) => Some(claims),
                None => {
                    debug!(?claims, "claims missing subject key");
                    None
                }
                _ => None,
            })
            .collect())
    }
}

#[async_trait::async_trait]
impl KvStore for DirectKvStore {
    async fn get_links(&self) -> Result<Vec<LinkDefinition>> {
        self.fetch_data(LINKDEF_PREFIX).await
    }
    async fn get_all_claims(&self) -> Result<Vec<HashMap<String, String>>> {
        self.fetch_data(CLAIMS_PREFIX).await
    }
    async fn get_provider_claims(&self) -> Result<Vec<HashMap<String, String>>> {
        self.filter_claims('V').await
    }
    async fn get_actor_claims(&self) -> Result<Vec<HashMap<String, String>>> {
        self.filter_claims('M').await
    }
    async fn get_filtered_links<F>(&self, filter_fn: F) -> Result<Vec<LinkDefinition>>
    where
        F: FnMut(&LinkDefinition) -> bool + Send,
    {
        self.get_links().await.map(|links| {
            links
                .into_iter()
                .filter(filter_fn)
                .collect::<Vec<LinkDefinition>>()
        })
    }

    async fn get_link(
        &self,
        actor_id: &str,
        link_name: &str,
        contract_id: &str,
    ) -> Result<Option<LinkDefinition>> {
        self.fetch_single(format!(
            "{LINKDEF_PREFIX}{}",
            ld_hash_raw(actor_id, contract_id, link_name)
        ))
        .await
    }

    async fn get_claims(&self, id: &str) -> Result<Option<HashMap<String, String>>> {
        self.fetch_single(format!("{CLAIMS_PREFIX}{id}")).await
    }
    async fn put_link(&self, ld: LinkDefinition) -> Result<()> {
        put_link(&self.store, &ld).await
    }
    async fn delete_link(&self, actor_id: &str, contract_id: &str, link_name: &str) -> Result<()> {
        delete_link(&self.store, actor_id, contract_id, link_name).await
    }
}