1use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::rc::Rc;
6use std::sync::Arc;
7
8use anyhow::Context;
9use async_trait::async_trait;
10use bytes::Bytes;
11use deno_core::OpState;
12use deno_core::futures::Stream;
13use deno_error::JsErrorBox;
14use deno_fetch::CreateHttpClientOptions;
15use deno_fetch::create_http_client;
16use deno_permissions::PermissionCheckError;
17use deno_tls::Proxy;
18use deno_tls::RootCertStoreProvider;
19use deno_tls::TlsKeys;
20use deno_tls::rustls::RootCertStore;
21use denokv_remote::MetadataEndpoint;
22use denokv_remote::Remote;
23use denokv_remote::RemoteResponse;
24use denokv_remote::RemoteTransport;
25use http_body_util::BodyExt;
26use url::Url;
27
28use crate::DatabaseHandler;
29
30#[derive(Clone)]
31pub struct HttpOptions {
32 pub user_agent: String,
33 pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
34 pub proxy: Option<Proxy>,
35 pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
36 pub client_cert_chain_and_key: TlsKeys,
37}
38
39impl HttpOptions {
40 pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, JsErrorBox> {
41 Ok(match &self.root_cert_store_provider {
42 Some(provider) => Some(provider.get_or_try_init()?.clone()),
43 None => None,
44 })
45 }
46}
47
48pub trait RemoteDbHandlerPermissions {
49 fn check_env(&mut self, var: &str) -> Result<(), PermissionCheckError>;
50 fn check_net_url(
51 &mut self,
52 url: &Url,
53 api_name: &str,
54 ) -> Result<(), PermissionCheckError>;
55}
56
57impl RemoteDbHandlerPermissions for deno_permissions::PermissionsContainer {
58 #[inline(always)]
59 fn check_env(&mut self, var: &str) -> Result<(), PermissionCheckError> {
60 deno_permissions::PermissionsContainer::check_env(self, var)
61 }
62
63 #[inline(always)]
64 fn check_net_url(
65 &mut self,
66 url: &Url,
67 api_name: &str,
68 ) -> Result<(), PermissionCheckError> {
69 deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
70 }
71}
72
73pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
74 http_options: HttpOptions,
75 _p: std::marker::PhantomData<P>,
76}
77
78impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
79 pub fn new(http_options: HttpOptions) -> Self {
80 Self {
81 http_options,
82 _p: PhantomData,
83 }
84 }
85}
86
87pub struct PermissionChecker<P: RemoteDbHandlerPermissions> {
88 state: Rc<RefCell<OpState>>,
89 _permissions: PhantomData<P>,
90}
91
92impl<P: RemoteDbHandlerPermissions> Clone for PermissionChecker<P> {
93 fn clone(&self) -> Self {
94 Self {
95 state: self.state.clone(),
96 _permissions: PhantomData,
97 }
98 }
99}
100
101impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions
102 for PermissionChecker<P>
103{
104 fn check_net_url(&self, url: &Url) -> Result<(), JsErrorBox> {
105 let mut state = self.state.borrow_mut();
106 let permissions = state.borrow_mut::<P>();
107 permissions
108 .check_net_url(url, "Deno.openKv")
109 .map_err(JsErrorBox::from_err)
110 }
111}
112
113#[derive(Clone)]
114pub struct FetchClient(deno_fetch::Client);
115pub struct FetchResponse(http::Response<deno_fetch::ResBody>);
116
117impl RemoteTransport for FetchClient {
118 type Response = FetchResponse;
119 async fn post(
120 &self,
121 url: Url,
122 headers: http::HeaderMap,
123 body: Bytes,
124 ) -> Result<(Url, http::StatusCode, Self::Response), JsErrorBox> {
125 let body = deno_fetch::ReqBody::full(body);
126 let mut req = http::Request::new(body);
127 *req.method_mut() = http::Method::POST;
128 *req.uri_mut() =
129 url.as_str().parse().map_err(|e: http::uri::InvalidUri| {
130 JsErrorBox::type_error(e.to_string())
131 })?;
132 *req.headers_mut() = headers;
133
134 let res = self
135 .0
136 .clone()
137 .send(req)
138 .await
139 .map_err(JsErrorBox::from_err)?;
140 let status = res.status();
141 Ok((url, status, FetchResponse(res)))
142 }
143}
144
145impl RemoteResponse for FetchResponse {
146 async fn bytes(self) -> Result<Bytes, JsErrorBox> {
147 Ok(self.0.collect().await?.to_bytes())
148 }
149 fn stream(
150 self,
151 ) -> impl Stream<Item = Result<Bytes, JsErrorBox>> + Send + Sync {
152 self.0.into_body().into_data_stream()
153 }
154 async fn text(self) -> Result<String, JsErrorBox> {
155 let bytes = self.bytes().await?;
156 Ok(
157 std::str::from_utf8(&bytes)
158 .map_err(JsErrorBox::from_err)?
159 .into(),
160 )
161 }
162}
163
164#[async_trait(?Send)]
165impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler
166 for RemoteDbHandler<P>
167{
168 type DB = Remote<PermissionChecker<P>, FetchClient>;
169
170 async fn open(
171 &self,
172 state: Rc<RefCell<OpState>>,
173 path: Option<String>,
174 ) -> Result<Self::DB, JsErrorBox> {
175 const ENV_VAR_NAME: &str = "DENO_KV_ACCESS_TOKEN";
176
177 let Some(url) = path else {
178 return Err(JsErrorBox::type_error("Missing database url"));
179 };
180
181 let Ok(parsed_url) = Url::parse(&url) else {
182 return Err(JsErrorBox::type_error(format!(
183 "Invalid database url: {}",
184 url
185 )));
186 };
187
188 {
189 let mut state = state.borrow_mut();
190 let permissions = state.borrow_mut::<P>();
191 permissions
192 .check_env(ENV_VAR_NAME)
193 .map_err(JsErrorBox::from_err)?;
194 permissions
195 .check_net_url(&parsed_url, "Deno.openKv")
196 .map_err(JsErrorBox::from_err)?;
197 }
198
199 let access_token = std::env::var(ENV_VAR_NAME)
200 .map_err(anyhow::Error::from)
201 .with_context(|| {
202 "Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
203 }).map_err(|e| JsErrorBox::generic(e.to_string()))?;
204
205 let metadata_endpoint = MetadataEndpoint {
206 url: parsed_url.clone(),
207 access_token: access_token.clone(),
208 };
209
210 let options = &self.http_options;
211 let client = create_http_client(
212 &options.user_agent,
213 CreateHttpClientOptions {
214 root_cert_store: options.root_cert_store()?,
215 ca_certs: vec![],
216 proxy: options.proxy.clone(),
217 dns_resolver: Default::default(),
218 unsafely_ignore_certificate_errors: options
219 .unsafely_ignore_certificate_errors
220 .clone(),
221 client_cert_chain_and_key: options
222 .client_cert_chain_and_key
223 .clone()
224 .try_into()
225 .unwrap(),
226 pool_max_idle_per_host: None,
227 pool_idle_timeout: None,
228 http1: false,
229 http2: true,
230 local_address: None,
231 client_builder_hook: None,
232 },
233 )
234 .map_err(JsErrorBox::from_err)?;
235 let fetch_client = FetchClient(client);
236
237 let permissions = PermissionChecker {
238 state: state.clone(),
239 _permissions: PhantomData,
240 };
241
242 let remote = Remote::new(fetch_client, permissions, metadata_endpoint);
243
244 Ok(remote)
245 }
246}