1use std::cell::RefCell;
4use std::rc::Rc;
5use std::sync::Arc;
6
7use anyhow::Context;
8use async_trait::async_trait;
9use bytes::Bytes;
10use deno_core::OpState;
11use deno_core::futures::Stream;
12use deno_error::JsErrorBox;
13use deno_fetch::CreateHttpClientOptions;
14use deno_fetch::create_http_client;
15use deno_permissions::PermissionsContainer;
16use deno_tls::Proxy;
17use deno_tls::RootCertStoreProvider;
18use deno_tls::TlsKeys;
19use deno_tls::rustls::RootCertStore;
20use denokv_remote::MetadataEndpoint;
21use denokv_remote::Remote;
22use denokv_remote::RemoteResponse;
23use denokv_remote::RemoteTransport;
24use http_body_util::BodyExt;
25use url::Url;
26
27use crate::DatabaseHandler;
28
29#[derive(Clone)]
30pub struct HttpOptions {
31 pub user_agent: String,
32 pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
33 pub proxy: Option<Proxy>,
34 pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
35 pub client_cert_chain_and_key: TlsKeys,
36}
37
38impl HttpOptions {
39 pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, JsErrorBox> {
40 Ok(match &self.root_cert_store_provider {
41 Some(provider) => Some(provider.get_or_try_init()?.clone()),
42 None => None,
43 })
44 }
45}
46
47pub struct RemoteDbHandler {
48 http_options: HttpOptions,
49}
50
51impl RemoteDbHandler {
52 pub fn new(http_options: HttpOptions) -> Self {
53 Self { http_options }
54 }
55}
56
57pub struct PermissionChecker {
58 state: Rc<RefCell<OpState>>,
59}
60
61impl Clone for PermissionChecker {
62 fn clone(&self) -> Self {
63 Self {
64 state: self.state.clone(),
65 }
66 }
67}
68
69impl denokv_remote::RemotePermissions for PermissionChecker {
70 fn check_net_url(&self, url: &Url) -> Result<(), JsErrorBox> {
71 let mut state = self.state.borrow_mut();
72 let permissions = state.borrow_mut::<PermissionsContainer>();
73 permissions
74 .check_net_url(url, "Deno.openKv")
75 .map_err(JsErrorBox::from_err)
76 }
77}
78
79#[derive(Clone)]
80pub struct FetchClient(deno_fetch::Client);
81pub struct FetchResponse(http::Response<deno_fetch::ResBody>);
82
83impl RemoteTransport for FetchClient {
84 type Response = FetchResponse;
85 async fn post(
86 &self,
87 url: Url,
88 headers: http::HeaderMap,
89 body: Bytes,
90 ) -> Result<(Url, http::StatusCode, Self::Response), JsErrorBox> {
91 let body = deno_fetch::ReqBody::full(body);
92 let mut req = http::Request::new(body);
93 *req.method_mut() = http::Method::POST;
94 *req.uri_mut() =
95 url.as_str().parse().map_err(|e: http::uri::InvalidUri| {
96 JsErrorBox::type_error(e.to_string())
97 })?;
98 *req.headers_mut() = headers;
99
100 let res = self
101 .0
102 .clone()
103 .send(req)
104 .await
105 .map_err(JsErrorBox::from_err)?;
106 let status = res.status();
107 Ok((url, status, FetchResponse(res)))
108 }
109}
110
111impl RemoteResponse for FetchResponse {
112 async fn bytes(self) -> Result<Bytes, JsErrorBox> {
113 Ok(self.0.collect().await?.to_bytes())
114 }
115 fn stream(
116 self,
117 ) -> impl Stream<Item = Result<Bytes, JsErrorBox>> + Send + Sync {
118 self.0.into_body().into_data_stream()
119 }
120 async fn text(self) -> Result<String, JsErrorBox> {
121 let bytes = self.bytes().await?;
122 Ok(
123 std::str::from_utf8(&bytes)
124 .map_err(JsErrorBox::from_err)?
125 .into(),
126 )
127 }
128}
129
130#[async_trait(?Send)]
131impl DatabaseHandler for RemoteDbHandler {
132 type DB = Remote<PermissionChecker, FetchClient>;
133
134 async fn open(
135 &self,
136 state: Rc<RefCell<OpState>>,
137 path: Option<String>,
138 ) -> Result<Self::DB, JsErrorBox> {
139 const ENV_VAR_NAME: &str = "DENO_KV_ACCESS_TOKEN";
140
141 let Some(url) = path else {
142 return Err(JsErrorBox::type_error("Missing database url"));
143 };
144
145 let Ok(parsed_url) = Url::parse(&url) else {
146 return Err(JsErrorBox::type_error(format!(
147 "Invalid database url: {}",
148 url
149 )));
150 };
151
152 {
153 let mut state = state.borrow_mut();
154 let permissions = state.borrow_mut::<PermissionsContainer>();
155 permissions
156 .check_env(ENV_VAR_NAME)
157 .map_err(JsErrorBox::from_err)?;
158 permissions
159 .check_net_url(&parsed_url, "Deno.openKv")
160 .map_err(JsErrorBox::from_err)?;
161 }
162
163 let access_token = std::env::var(ENV_VAR_NAME)
164 .map_err(anyhow::Error::from)
165 .with_context(|| {
166 "Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
167 }).map_err(|e| JsErrorBox::generic(e.to_string()))?;
168
169 let metadata_endpoint = MetadataEndpoint {
170 url: parsed_url.clone(),
171 access_token: access_token.clone(),
172 };
173
174 let options = &self.http_options;
175 let client = create_http_client(
176 &options.user_agent,
177 CreateHttpClientOptions {
178 root_cert_store: options.root_cert_store()?,
179 ca_certs: vec![],
180 proxy: options.proxy.clone(),
181 dns_resolver: Default::default(),
182 unsafely_ignore_certificate_errors: options
183 .unsafely_ignore_certificate_errors
184 .clone(),
185 client_cert_chain_and_key: options
186 .client_cert_chain_and_key
187 .clone()
188 .try_into()
189 .unwrap(),
190 pool_max_idle_per_host: None,
191 pool_idle_timeout: None,
192 http1: false,
193 http2: true,
194 local_address: None,
195 client_builder_hook: None,
196 },
197 )
198 .map_err(JsErrorBox::from_err)?;
199 let fetch_client = FetchClient(client);
200
201 let permissions = PermissionChecker {
202 state: state.clone(),
203 };
204
205 let remote = Remote::new(fetch_client, permissions, metadata_endpoint);
206
207 Ok(remote)
208 }
209}