deno_kv/
remote.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::rc::Rc;
6use std::sync::Arc;
7
8use anyhow::Context;
9use async_trait::async_trait;
10use bytes::Bytes;
11use deno_core::OpState;
12use deno_core::futures::Stream;
13use deno_error::JsErrorBox;
14use deno_fetch::CreateHttpClientOptions;
15use deno_fetch::create_http_client;
16use deno_permissions::PermissionCheckError;
17use deno_tls::Proxy;
18use deno_tls::RootCertStoreProvider;
19use deno_tls::TlsKeys;
20use deno_tls::rustls::RootCertStore;
21use denokv_remote::MetadataEndpoint;
22use denokv_remote::Remote;
23use denokv_remote::RemoteResponse;
24use denokv_remote::RemoteTransport;
25use http_body_util::BodyExt;
26use url::Url;
27
28use crate::DatabaseHandler;
29
30#[derive(Clone)]
31pub struct HttpOptions {
32  pub user_agent: String,
33  pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
34  pub proxy: Option<Proxy>,
35  pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
36  pub client_cert_chain_and_key: TlsKeys,
37}
38
39impl HttpOptions {
40  pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, JsErrorBox> {
41    Ok(match &self.root_cert_store_provider {
42      Some(provider) => Some(provider.get_or_try_init()?.clone()),
43      None => None,
44    })
45  }
46}
47
48pub trait RemoteDbHandlerPermissions {
49  fn check_env(&mut self, var: &str) -> Result<(), PermissionCheckError>;
50  fn check_net_url(
51    &mut self,
52    url: &Url,
53    api_name: &str,
54  ) -> Result<(), PermissionCheckError>;
55}
56
57impl RemoteDbHandlerPermissions for deno_permissions::PermissionsContainer {
58  #[inline(always)]
59  fn check_env(&mut self, var: &str) -> Result<(), PermissionCheckError> {
60    deno_permissions::PermissionsContainer::check_env(self, var)
61  }
62
63  #[inline(always)]
64  fn check_net_url(
65    &mut self,
66    url: &Url,
67    api_name: &str,
68  ) -> Result<(), PermissionCheckError> {
69    deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
70  }
71}
72
73pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
74  http_options: HttpOptions,
75  _p: std::marker::PhantomData<P>,
76}
77
78impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
79  pub fn new(http_options: HttpOptions) -> Self {
80    Self {
81      http_options,
82      _p: PhantomData,
83    }
84  }
85}
86
87pub struct PermissionChecker<P: RemoteDbHandlerPermissions> {
88  state: Rc<RefCell<OpState>>,
89  _permissions: PhantomData<P>,
90}
91
92impl<P: RemoteDbHandlerPermissions> Clone for PermissionChecker<P> {
93  fn clone(&self) -> Self {
94    Self {
95      state: self.state.clone(),
96      _permissions: PhantomData,
97    }
98  }
99}
100
101impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions
102  for PermissionChecker<P>
103{
104  fn check_net_url(&self, url: &Url) -> Result<(), JsErrorBox> {
105    let mut state = self.state.borrow_mut();
106    let permissions = state.borrow_mut::<P>();
107    permissions
108      .check_net_url(url, "Deno.openKv")
109      .map_err(JsErrorBox::from_err)
110  }
111}
112
113#[derive(Clone)]
114pub struct FetchClient(deno_fetch::Client);
115pub struct FetchResponse(http::Response<deno_fetch::ResBody>);
116
117impl RemoteTransport for FetchClient {
118  type Response = FetchResponse;
119  async fn post(
120    &self,
121    url: Url,
122    headers: http::HeaderMap,
123    body: Bytes,
124  ) -> Result<(Url, http::StatusCode, Self::Response), JsErrorBox> {
125    let body = deno_fetch::ReqBody::full(body);
126    let mut req = http::Request::new(body);
127    *req.method_mut() = http::Method::POST;
128    *req.uri_mut() =
129      url.as_str().parse().map_err(|e: http::uri::InvalidUri| {
130        JsErrorBox::type_error(e.to_string())
131      })?;
132    *req.headers_mut() = headers;
133
134    let res = self
135      .0
136      .clone()
137      .send(req)
138      .await
139      .map_err(JsErrorBox::from_err)?;
140    let status = res.status();
141    Ok((url, status, FetchResponse(res)))
142  }
143}
144
145impl RemoteResponse for FetchResponse {
146  async fn bytes(self) -> Result<Bytes, JsErrorBox> {
147    Ok(self.0.collect().await?.to_bytes())
148  }
149  fn stream(
150    self,
151  ) -> impl Stream<Item = Result<Bytes, JsErrorBox>> + Send + Sync {
152    self.0.into_body().into_data_stream()
153  }
154  async fn text(self) -> Result<String, JsErrorBox> {
155    let bytes = self.bytes().await?;
156    Ok(
157      std::str::from_utf8(&bytes)
158        .map_err(JsErrorBox::from_err)?
159        .into(),
160    )
161  }
162}
163
164#[async_trait(?Send)]
165impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler
166  for RemoteDbHandler<P>
167{
168  type DB = Remote<PermissionChecker<P>, FetchClient>;
169
170  async fn open(
171    &self,
172    state: Rc<RefCell<OpState>>,
173    path: Option<String>,
174  ) -> Result<Self::DB, JsErrorBox> {
175    const ENV_VAR_NAME: &str = "DENO_KV_ACCESS_TOKEN";
176
177    let Some(url) = path else {
178      return Err(JsErrorBox::type_error("Missing database url"));
179    };
180
181    let Ok(parsed_url) = Url::parse(&url) else {
182      return Err(JsErrorBox::type_error(format!(
183        "Invalid database url: {}",
184        url
185      )));
186    };
187
188    {
189      let mut state = state.borrow_mut();
190      let permissions = state.borrow_mut::<P>();
191      permissions
192        .check_env(ENV_VAR_NAME)
193        .map_err(JsErrorBox::from_err)?;
194      permissions
195        .check_net_url(&parsed_url, "Deno.openKv")
196        .map_err(JsErrorBox::from_err)?;
197    }
198
199    let access_token = std::env::var(ENV_VAR_NAME)
200      .map_err(anyhow::Error::from)
201      .with_context(|| {
202        "Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
203      }).map_err(|e| JsErrorBox::generic(e.to_string()))?;
204
205    let metadata_endpoint = MetadataEndpoint {
206      url: parsed_url.clone(),
207      access_token: access_token.clone(),
208    };
209
210    let options = &self.http_options;
211    let client = create_http_client(
212      &options.user_agent,
213      CreateHttpClientOptions {
214        root_cert_store: options.root_cert_store()?,
215        ca_certs: vec![],
216        proxy: options.proxy.clone(),
217        dns_resolver: Default::default(),
218        unsafely_ignore_certificate_errors: options
219          .unsafely_ignore_certificate_errors
220          .clone(),
221        client_cert_chain_and_key: options
222          .client_cert_chain_and_key
223          .clone()
224          .try_into()
225          .unwrap(),
226        pool_max_idle_per_host: None,
227        pool_idle_timeout: None,
228        http1: false,
229        http2: true,
230        local_address: None,
231        client_builder_hook: None,
232      },
233    )
234    .map_err(JsErrorBox::from_err)?;
235    let fetch_client = FetchClient(client);
236
237    let permissions = PermissionChecker {
238      state: state.clone(),
239      _permissions: PhantomData,
240    };
241
242    let remote = Remote::new(fetch_client, permissions, metadata_endpoint);
243
244    Ok(remote)
245  }
246}