viceroy_lib/
session.rs

1//! Session type and related facilities.
2
3mod async_item;
4mod downstream;
5
6pub use async_item::{
7    AsyncItem, PeekableTask, PendingCacheTask, PendingDownstreamReqTask, PendingKvDeleteTask,
8    PendingKvInsertTask, PendingKvListTask, PendingKvLookupTask,
9};
10
11use std::collections::HashMap;
12use std::future::Future;
13use std::io::Write;
14use std::net::IpAddr;
15use std::path::Path;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use crate::cache::{Cache, CacheEntry};
21use crate::object_store::KvStoreError;
22use crate::wiggle_abi::types::{CacheBusyHandle, CacheHandle, FramingHeadersMode};
23
24use {
25    self::downstream::DownstreamResponseState,
26    crate::{
27        acl::Acl,
28        body::Body,
29        config::{Backend, Backends, Dictionaries, LoadedDictionary},
30        downstream::{DownstreamMetadata, DownstreamRequest},
31        error::{Error, HandleError},
32        logging::LogEndpoint,
33        object_store::{ObjectKey, ObjectStoreKey, ObjectStores, ObjectValue},
34        pushpin::PushpinRedirectInfo,
35        secret_store::{SecretLookup, SecretStores},
36        shielding_site::ShieldingSites,
37        streaming_body::StreamingBody,
38        upstream::{SelectTarget, TlsConfig},
39        wiggle_abi::types::{
40            self, AclHandle, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle,
41            KvInsertMode, KvStoreDeleteHandle, KvStoreHandle, KvStoreInsertHandle,
42            KvStoreListHandle, KvStoreLookupHandle, PendingKvDeleteHandle, PendingKvInsertHandle,
43            PendingKvListHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle,
44            RequestPromiseHandle, ResponseHandle, SecretHandle, SecretStoreHandle,
45        },
46        ExecuteCtx,
47    },
48    cranelift_entity::{entity_impl, PrimaryMap},
49    futures::future::{self, FutureExt},
50    http::{request, response, HeaderMap, Response},
51};
52
53const NEXT_REQ_ACCEPT_MAX: usize = 5;
54const NEXT_REQ_TIMEOUT: Duration = Duration::from_secs(10);
55const NGWAF_ALLOW_VERDICT: &str = "allow";
56
57pub struct RequestParts {
58    parts: Option<request::Parts>,
59    metadata: Option<DownstreamMetadata>,
60}
61
62/// Data specific to an individual request, including any host-side
63/// allocations on behalf of the guest processing the request.
64pub struct Session {
65    session_id: u64,
66    /// The amount of time we've spent on this session in microseconds.
67    pub active_cpu_time_us: Arc<AtomicU64>,
68    /// Handle for the downstream request "parts". NB the backing parts data can be mutated
69    /// or even removed from the relevant map.
70    downstream_req_handle: RequestHandle,
71    /// Handle for the downstream request body. NB the backing body data can be mutated
72    /// or even removed from the relevant map.
73    downstream_req_body_handle: BodyHandle,
74    /// A channel for sending a [`Response`][resp] downstream to the client.
75    ///
76    /// [resp]: https://docs.rs/http/latest/http/response/struct.Response.html
77    downstream_resp: DownstreamResponseState,
78    /// Handle for receiving a new downstream request.
79    downstream_pending_handle: Option<AsyncItemHandle>,
80    /// A handle map for items that provide blocking operations. These items are grouped together
81    /// in order to support generic async operations that work across different object types.
82    async_items: PrimaryMap<AsyncItemHandle, Option<AsyncItem>>,
83    /// The context for executing the service that is shared between sessions.
84    ctx: Arc<ExecuteCtx>,
85    /// A handle map for the component [`Parts`][parts] of the session's HTTP [`Request`][req]s.
86    ///
87    /// [parts]: https://docs.rs/http/latest/http/request/struct.Parts.html
88    /// [req]: https://docs.rs/http/latest/http/request/struct.Request.html
89    req_parts: PrimaryMap<RequestHandle, RequestParts>,
90    /// A handle map for the component [`Parts`][parts] of the session's HTTP [`Response`][resp]s.
91    ///
92    /// [parts]: https://docs.rs/http/latest/http/response/struct.Parts.html
93    /// [resp]: https://docs.rs/http/latest/http/response/struct.Response.html
94    resp_parts: PrimaryMap<ResponseHandle, Option<response::Parts>>,
95    /// Where to direct logging endpoint messages.
96    capture_logs: Arc<Mutex<dyn Write + Send>>,
97    /// A handle map for logging endpoints.
98    log_endpoints: PrimaryMap<EndpointHandle, LogEndpoint>,
99    /// A by-name map for logging endpoints.
100    log_endpoints_by_name: HashMap<Vec<u8>, EndpointHandle>,
101    /// Active ACL handles.
102    acl_handles: PrimaryMap<AclHandle, Arc<Acl>>,
103    /// The NGWAF verdict to return when using the `inspect` hostcall.
104    ngwaf_verdict: String,
105    /// The backends dynamically added by the program. This is separated from
106    /// `backends` because we do not want one session to effect the backends
107    /// available to any other session.
108    dynamic_backends: Backends,
109    /// The dictionaries that have been opened by the guest.
110    loaded_dictionaries: PrimaryMap<DictionaryHandle, LoadedDictionary>,
111    /// The object stores configured for this execution.
112    ///
113    /// Populated prior to guest execution.
114    kv_store_by_name: PrimaryMap<KvStoreHandle, ObjectStoreKey>,
115    /// The secret stores configured for this execution.
116    ///
117    /// Populated prior to guest execution, and never modified.
118    secret_stores_by_name: PrimaryMap<SecretStoreHandle, String>,
119    /// The secrets for this execution.
120    ///
121    /// Populated prior to guest execution, and never modified.
122    secrets_by_name: PrimaryMap<SecretHandle, SecretLookup>,
123    /// How many additional downstream requests have been receive by this Session.
124    next_req_accepted: usize,
125}
126
127impl Session {
128    /// Create an empty session.
129    #[allow(clippy::too_many_arguments)]
130    pub fn new(
131        downstream: DownstreamRequest,
132        active_cpu_time_us: Arc<AtomicU64>,
133        ctx: Arc<ExecuteCtx>,
134    ) -> Session {
135        let (parts, body) = downstream.req.into_parts();
136
137        let mut async_items: PrimaryMap<AsyncItemHandle, Option<AsyncItem>> = PrimaryMap::new();
138        let mut req_parts = PrimaryMap::new();
139
140        let session_id = downstream.metadata.req_id;
141        let downstream_req_handle = req_parts.push(RequestParts {
142            parts: Some(parts),
143            metadata: Some(downstream.metadata),
144        });
145        let downstream_req_body_handle = async_items.push(Some(AsyncItem::Body(body))).into();
146
147        Session {
148            session_id,
149            downstream_req_handle,
150            downstream_req_body_handle,
151            active_cpu_time_us,
152            async_items,
153            req_parts,
154            resp_parts: PrimaryMap::new(),
155            downstream_resp: DownstreamResponseState::new(downstream.sender),
156            capture_logs: ctx.capture_logs(),
157            log_endpoints: PrimaryMap::new(),
158            log_endpoints_by_name: HashMap::new(),
159            acl_handles: PrimaryMap::new(),
160            ngwaf_verdict: NGWAF_ALLOW_VERDICT.to_string(),
161            dynamic_backends: Backends::default(),
162            loaded_dictionaries: PrimaryMap::new(),
163            kv_store_by_name: PrimaryMap::new(),
164            secret_stores_by_name: PrimaryMap::new(),
165            secrets_by_name: PrimaryMap::new(),
166            downstream_pending_handle: None,
167            next_req_accepted: 0,
168
169            ctx,
170        }
171    }
172
173    // ----- Downstream Request API -----
174
175    /// Retrieve the downstream metadata address associated with a request handle.
176    pub fn downstream_metadata(
177        &self,
178        handle: RequestHandle,
179    ) -> Result<Option<&DownstreamMetadata>, HandleError> {
180        self.req_parts
181            .get(handle)
182            .ok_or(HandleError::InvalidRequestHandle(handle))
183            .map(|r| r.metadata.as_ref())
184    }
185
186    /// Retrieve the downstream client IP address associated with a request handle.
187    pub fn downstream_client_ip(
188        &self,
189        handle: RequestHandle,
190    ) -> Result<Option<IpAddr>, HandleError> {
191        Ok(self
192            .downstream_metadata(handle)?
193            .map(|md| md.client_addr.ip()))
194    }
195
196    /// Retrieve the IP address the downstream client connected to a request handle.
197    pub fn downstream_server_ip(
198        &self,
199        handle: RequestHandle,
200    ) -> Result<Option<IpAddr>, HandleError> {
201        Ok(self
202            .downstream_metadata(handle)?
203            .map(|md| md.server_addr.ip()))
204    }
205
206    /// Retrieve the compliance region that received the request for the given handle.
207    pub fn downstream_compliance_region(
208        &self,
209        handle: RequestHandle,
210    ) -> Result<Option<&str>, HandleError> {
211        Ok(self
212            .downstream_metadata(handle)?
213            .map(|md| md.compliance_region.as_str()))
214    }
215
216    /// Retrieve the request ID for the given request handle.
217    pub fn downstream_request_id(&self, handle: RequestHandle) -> Result<Option<u64>, HandleError> {
218        Ok(self.downstream_metadata(handle)?.map(|md| md.req_id))
219    }
220
221    /// Retrieve the handle corresponding to the most recent downstream request.
222    pub fn downstream_request(&self) -> RequestHandle {
223        self.downstream_req_handle
224    }
225
226    /// Retrieve the handle corresponding to the downstream request body.
227    pub fn downstream_request_body(&self) -> BodyHandle {
228        self.downstream_req_body_handle
229    }
230
231    /// Access the header map that was copied from the original downstream request.
232    pub fn downstream_original_headers(
233        &self,
234        handle: RequestHandle,
235    ) -> Result<Option<&HeaderMap>, HandleError> {
236        Ok(self
237            .downstream_metadata(handle)?
238            .map(|md| &md.original_headers))
239    }
240
241    // ----- Downstream Response API -----
242
243    /// Send the downstream response.
244    ///
245    /// Yield an error if a response has already been sent.
246    ///
247    /// # Panics
248    ///
249    /// This method must only be called once per downstream request, after which attempting
250    /// to send another response will trigger a panic.
251    pub fn send_downstream_response(&mut self, resp: Response<Body>) -> Result<(), Error> {
252        self.downstream_resp.send(resp)
253    }
254
255    /// Redirect the downstream request to Pushpin.
256    ///
257    /// Yield an error if a response has already been sent.
258    ///
259    /// # Panics
260    ///
261    /// This method must only be called once per downstream request, after which attempting
262    /// to send another response will trigger a panic.
263    pub fn redirect_downstream_to_pushpin(
264        &mut self,
265        redirect_info: PushpinRedirectInfo,
266    ) -> Result<(), Error> {
267        self.downstream_resp.redirect_to_pushpin(redirect_info)
268    }
269
270    /// Ensure the downstream response sender is closed, and send the provided response if it
271    /// isn't.
272    pub fn close_downstream_response_sender(&mut self, resp: Response<Body>) {
273        let _ = self.downstream_resp.send(resp);
274    }
275
276    // ----- Bodies API -----
277
278    /// Insert a [`Body`][body] into the session.
279    ///
280    /// This method returns the [`BodyHandle`][handle], which can then be used to access and mutate
281    /// the response parts.
282    ///
283    /// [handle]: ../wiggle_abi/types/struct.BodyHandle.html
284    /// [body]: ../body/struct.Body.html
285    pub fn insert_body(&mut self, body: Body) -> BodyHandle {
286        self.async_items.push(Some(AsyncItem::Body(body))).into()
287    }
288
289    /// Get a reference to a [`Body`][body], given its [`BodyHandle`][handle].
290    ///
291    /// Returns a [`HandleError`][err] if the handle is not associated with a body in the session.
292    ///
293    /// [body]: ../body/struct.Body.html
294    /// [err]: ../error/enum.HandleError.html
295    /// [handle]: ../wiggle_abi/types/struct.BodyHandle.html
296    pub fn body(&self, handle: BodyHandle) -> Result<&Body, HandleError> {
297        self.async_items
298            .get(handle.into())
299            .and_then(Option::as_ref)
300            .and_then(AsyncItem::as_body)
301            .ok_or(HandleError::InvalidBodyHandle(handle))
302    }
303
304    /// Get a mutable reference to a [`Body`][body], given its [`BodyHandle`][handle].
305    ///
306    /// Returns a [`HandleError`][err] if the handle is not associated with a body in the session.
307    ///
308    /// [body]: ../body/struct.Body.html
309    /// [err]: ../error/enum.HandleError.html
310    /// [handle]: ../wiggle_abi/types/struct.BodyHandle.html
311    pub fn body_mut(&mut self, handle: BodyHandle) -> Result<&mut Body, HandleError> {
312        self.async_items
313            .get_mut(handle.into())
314            .and_then(Option::as_mut)
315            .and_then(AsyncItem::as_body_mut)
316            .ok_or(HandleError::InvalidBodyHandle(handle))
317    }
318
319    /// Take ownership of a [`Body`][body], given its [`BodyHandle`][handle].
320    ///
321    /// Returns a [`HandleError`][err] if the handle is not associated with a body in the session.
322    ///
323    /// [body]: ../body/struct.Body.html
324    /// [err]: ../error/enum.HandleError.html
325    /// [handle]: ../wiggle_abi/types/struct.BodyHandle.html
326    pub fn take_body(&mut self, handle: BodyHandle) -> Result<Body, HandleError> {
327        self.async_items
328            .get_mut(handle.into())
329            .and_then(Option::take)
330            .and_then(AsyncItem::into_body)
331            .ok_or(HandleError::InvalidBodyHandle(handle))
332    }
333
334    /// Drop a [`Body`][crate::body::Body] from the [`Session`], given its [`BodyHandle`][crate::wiggle_abi::types::BodyHandle].
335    ///
336    /// Returns a [`HandleError`][crate::error::HandleError] if the handle is not associated with a body in the session.
337    pub fn drop_body(&mut self, handle: BodyHandle) -> Result<(), HandleError> {
338        self.async_items
339            .get_mut(handle.into())
340            .and_then(Option::take)
341            .map(drop)
342            .ok_or(HandleError::InvalidBodyHandle(handle))
343    }
344
345    /// Transition a normal [`Body`][body] into the write end of a streaming body, returning
346    /// the original body with the read end appended.
347    ///
348    /// Returns a [`HandleError`][err] if the handle is not associated with a body in the session.
349    ///
350    /// [body]: ../body/struct.Body.html
351    /// [err]: ../error/enum.HandleError.html
352    pub fn begin_streaming(&mut self, handle: BodyHandle) -> Result<Body, HandleError> {
353        self.async_items
354            .get_mut(handle.into())
355            .and_then(Option::as_mut)
356            .and_then(AsyncItem::begin_streaming)
357            .ok_or(HandleError::InvalidBodyHandle(handle))
358    }
359
360    /// Returns `true` if and only if the provided `BodyHandle` is the downstream body being sent.
361    ///
362    /// To get a mutable reference to the streaming body `Sender`, see
363    /// [`Session::streaming_body_mut`](struct.Session.html#method.streaming_body_mut).
364    pub fn is_streaming_body(&self, handle: BodyHandle) -> bool {
365        if let Some(Some(body)) = self.async_items.get(handle.into()) {
366            body.is_streaming()
367        } else {
368            false
369        }
370    }
371
372    /// Get a mutable reference to the streaming body `Sender`, if and only if the provided
373    /// `BodyHandle` is the downstream body being sent.
374    ///
375    /// To check if a handle is the currently-streaming downstream response body, see
376    /// [`Session::is_streaming_body`](struct.Session.html#method.is_streaming_body).
377    ///
378    /// Returns a [`HandleError`][err] if the handle is not associated with a body in the session.
379    ///
380    /// [err]: ../error/enum.HandleError.html
381    pub fn streaming_body_mut(
382        &mut self,
383        handle: BodyHandle,
384    ) -> Result<&mut StreamingBody, HandleError> {
385        self.async_items
386            .get_mut(handle.into())
387            .and_then(Option::as_mut)
388            .and_then(AsyncItem::as_streaming_mut)
389            .ok_or(HandleError::InvalidBodyHandle(handle))
390    }
391
392    /// Take ownership of a streaming body `Sender`, if and only if the provided
393    /// `BodyHandle` is the downstream body being sent.
394    ///
395    /// To check if a handle is the currently-streaming downstream response body, see
396    /// [`Session::is_streaming_body`](struct.Session.html#method.is_streaming_body).
397    ///
398    /// Returns a [`HandleError`][err] if the handle is not associated with a body in the session.
399    ///
400    /// [err]: ../error/enum.HandleError.html
401    pub fn take_streaming_body(
402        &mut self,
403        handle: BodyHandle,
404    ) -> Result<StreamingBody, HandleError> {
405        self.async_items
406            .get_mut(handle.into())
407            .and_then(Option::take)
408            .and_then(AsyncItem::into_streaming)
409            .ok_or(HandleError::InvalidBodyHandle(handle))
410    }
411
412    // ----- Request Parts API -----
413
414    /// Insert the [`Parts`][parts] of a [`Request`][req] into the session.
415    ///
416    /// This method returns a new [`RequestHandle`][handle], which can then be used to access
417    /// and mutate the request parts.
418    ///
419    /// [handle]: ../wiggle_abi/types/struct.RequestHandle.html
420    /// [parts]: https://docs.rs/http/latest/http/request/struct.Parts.html
421    /// [req]: https://docs.rs/http/latest/http/request/struct.Request.html
422    pub fn insert_request_parts(&mut self, parts: request::Parts) -> RequestHandle {
423        self.req_parts.push(RequestParts {
424            parts: Some(parts),
425            metadata: None,
426        })
427    }
428
429    /// Get a reference to the [`Parts`][parts] of a [`Request`][req], given its
430    /// [`RequestHandle`][handle].
431    ///
432    /// Returns a [`HandleError`][err] if the handle is not associated with a request in the
433    /// session.
434    ///
435    /// [err]: ../error/enum.HandleError.html
436    /// [handle]: ../wiggle_abi/types/struct.RequestHandle.html
437    /// [parts]: https://docs.rs/http/latest/http/request/struct.Parts.html
438    /// [req]: https://docs.rs/http/latest/http/request/struct.Request.html
439    pub fn request_parts(&self, handle: RequestHandle) -> Result<&request::Parts, HandleError> {
440        self.req_parts
441            .get(handle)
442            .and_then(|r| r.parts.as_ref())
443            .ok_or(HandleError::InvalidRequestHandle(handle))
444    }
445
446    /// Get a mutable reference to the [`Parts`][parts] of a [`Request`][req], given its
447    /// [`RequestHandle`][handle].
448    ///
449    /// Returns a [`HandleError`][err] if the handle is not associated with a request in the
450    /// session.
451    ///
452    /// [err]: ../error/enum.HandleError.html
453    /// [handle]: ../wiggle_abi/types/struct.RequestHandle.html
454    /// [parts]: https://docs.rs/http/latest/http/request/struct.Parts.html
455    /// [req]: https://docs.rs/http/latest/http/request/struct.Request.html
456    pub fn request_parts_mut(
457        &mut self,
458        handle: RequestHandle,
459    ) -> Result<&mut request::Parts, HandleError> {
460        self.req_parts
461            .get_mut(handle)
462            .and_then(|r| r.parts.as_mut())
463            .ok_or(HandleError::InvalidRequestHandle(handle))
464    }
465
466    /// Take ownership of the [`Parts`][parts] of a [`Request`][req], given its
467    /// [`RequestHandle`][handle].
468    ///
469    /// Returns a [`HandleError`][err] if the handle is not associated with a request in the
470    /// session.
471    ///
472    /// [err]: ../error/enum.HandleError.html
473    /// [handle]: ../wiggle_abi/types/struct.RequestHandle.html
474    /// [parts]: https://docs.rs/http/latest/http/request/struct.Parts.html
475    /// [req]: https://docs.rs/http/latest/http/request/struct.Request.html
476    pub fn take_request_parts(
477        &mut self,
478        handle: RequestHandle,
479    ) -> Result<request::Parts, HandleError> {
480        self.req_parts
481            .get_mut(handle)
482            .and_then(|r| r.parts.take())
483            .ok_or(HandleError::InvalidRequestHandle(handle))
484    }
485
486    // ----- Response Parts API -----
487
488    /// Insert the [`Parts`][parts] of a [`Response`][resp] into the session.
489    ///
490    /// This method returns a new [`ResponseHandle`][handle], which can then be used to access
491    /// and mutate the response parts.
492    ///
493    /// [handle]: ../wiggle_abi/types/struct.ResponseHandle.html
494    /// [parts]: https://docs.rs/http/latest/http/response/struct.Parts.html
495    /// [resp]: https://docs.rs/http/latest/http/response/struct.Response.html
496    pub fn insert_response_parts(&mut self, parts: response::Parts) -> ResponseHandle {
497        self.resp_parts.push(Some(parts))
498    }
499
500    /// Get a reference to the [`Parts`][parts] of a [`Response`][resp], given its
501    /// [`ResponseHandle`][handle].
502    ///
503    /// Returns a [`HandleError`][err] if the handle is not associated with a response in the
504    /// session.
505    ///
506    /// [err]: ../error/enum.HandleError.html
507    /// [handle]: ../wiggle_abi/types/struct.ResponseHandle.html
508    /// [parts]: https://docs.rs/http/latest/http/response/struct.Parts.html
509    /// [resp]: https://docs.rs/http/latest/http/response/struct.Response.html
510    pub fn response_parts(&self, handle: ResponseHandle) -> Result<&response::Parts, HandleError> {
511        self.resp_parts
512            .get(handle)
513            .and_then(Option::as_ref)
514            .ok_or(HandleError::InvalidResponseHandle(handle))
515    }
516
517    /// Get a mutable reference to the [`Parts`][parts] of a [`Response`][resp], given its
518    /// [`ResponseHandle`][handle].
519    ///
520    /// Returns a [`HandleError`][err] if the handle is not associated with a response in the
521    /// session.
522    ///
523    /// [err]: ../error/enum.HandleError.html
524    /// [handle]: ../wiggle_abi/types/struct.ResponseHandle.html
525    /// [parts]: https://docs.rs/http/latest/http/response/struct.Parts.html
526    /// [resp]: https://docs.rs/http/latest/http/response/struct.Response.html
527    pub fn response_parts_mut(
528        &mut self,
529        handle: ResponseHandle,
530    ) -> Result<&mut response::Parts, HandleError> {
531        self.resp_parts
532            .get_mut(handle)
533            .and_then(Option::as_mut)
534            .ok_or(HandleError::InvalidResponseHandle(handle))
535    }
536
537    /// Take ownership of the [`Parts`][parts] of a [`Response`][resp], given its
538    /// [`ResponseHandle`][handle].
539    ///
540    /// Returns a [`HandleError`][err] if the handle is not associated with a response in the
541    /// session.
542    ///
543    /// [err]: ../error/enum.HandleError.html
544    /// [handle]: ../wiggle_abi/types/struct.ResponseHandle.html
545    /// [parts]: https://docs.rs/http/latest/http/response/struct.Parts.html
546    /// [resp]: https://docs.rs/http/latest/http/response/struct.Response.html
547    pub fn take_response_parts(
548        &mut self,
549        handle: ResponseHandle,
550    ) -> Result<response::Parts, HandleError> {
551        self.resp_parts
552            .get_mut(handle)
553            .and_then(Option::take)
554            .ok_or(HandleError::InvalidResponseHandle(handle))
555    }
556
557    pub fn insert_response(&mut self, resp: Response<Body>) -> (ResponseHandle, BodyHandle) {
558        let (resp_parts, resp_body) = resp.into_parts();
559        let resp_handle = self.insert_response_parts(resp_parts);
560        let body_handle = self.insert_body(resp_body);
561        (resp_handle, body_handle)
562    }
563
564    // ----- Logging Endpoints API -----
565
566    /// Get an [`EndpointHandle`][handle] from the session, corresponding to the provided
567    /// endpoint name. A new backing [`LogEndpoint`] will be created if one does not
568    /// already exist.
569    ///
570    /// [handle]: ../wiggle_abi/types/struct.EndpointHandle.html
571    /// [endpoint]: ../logging/struct.LogEndpoint.html
572    pub fn log_endpoint_handle(&mut self, name: &[u8]) -> EndpointHandle {
573        if let Some(handle) = self.log_endpoints_by_name.get(name).copied() {
574            return handle;
575        }
576        let endpoint = LogEndpoint::new(name, self.capture_logs.clone());
577        let handle = self.log_endpoints.push(endpoint);
578        self.log_endpoints_by_name.insert(name.to_owned(), handle);
579        handle
580    }
581
582    /// Get a reference to a [`LogEndpoint`][endpoint], given its [`EndpointHandle`][handle].
583    ///
584    /// Returns a [`HandleError`][err] if the handle is not associated with an endpoint in the
585    /// session.
586    ///
587    /// [err]: ../error/enum.HandleError.html
588    /// [handle]: ../wiggle_abi/types/struct.EndpointHandle.html
589    /// [endpoint]: ../logging/struct.LogEndpoint.html
590    pub fn log_endpoint(&self, handle: EndpointHandle) -> Result<&LogEndpoint, HandleError> {
591        self.log_endpoints
592            .get(handle)
593            .ok_or(HandleError::InvalidEndpointHandle(handle))
594    }
595
596    // ----- ACLs API -----
597
598    pub fn acl_handle_by_name(&mut self, name: &str) -> Option<AclHandle> {
599        let acl = self.ctx.acls().get_acl(name)?;
600        Some(self.acl_handles.push(acl.clone()))
601    }
602
603    pub fn acl_by_handle(&self, handle: AclHandle) -> Option<Arc<Acl>> {
604        self.acl_handles.get(handle).map(Arc::clone)
605    }
606
607    // ----- Backends API -----
608
609    /// Get the collection of static backends.
610    pub fn backends(&self) -> &Backends {
611        self.ctx.backends()
612    }
613
614    /// Look up a backend by name.
615    pub fn backend(&self, name: &str) -> Option<&Arc<Backend>> {
616        // it doesn't actually matter what order we do this search, because
617        // the namespaces should be unique.
618        self.backends()
619            .get(name)
620            .or_else(|| self.dynamic_backends.get(name))
621    }
622
623    /// Look up a dynamic backend (only) by name.
624    pub fn dynamic_backend(&self, name: &str) -> Option<&Arc<Backend>> {
625        self.dynamic_backends.get(name)
626    }
627
628    /// Return the full list of static and dynamic backend names as an [`Iterator`].
629    pub fn backend_names(&self) -> impl Iterator<Item = &String> {
630        self.backends().keys().chain(self.dynamic_backends.keys())
631    }
632
633    /// Try to add a backend with the given name prefix to our set of current backends.
634    /// Upon success, return true. If the name already exists somewhere, return false;
635    /// the caller should signal an appropriate error.
636    pub fn add_backend(&mut self, name: &str, info: Backend) -> bool {
637        // if this name already exists, either as a built in or dynamic backend, say no
638        if self.backends().contains_key(name) || self.dynamic_backends.contains_key(name) {
639            return false;
640        }
641
642        self.dynamic_backends
643            .insert(name.to_string(), Arc::new(info));
644
645        true
646    }
647
648    // ----- TLS config -----
649
650    /// Access the TLS configuration.
651    pub fn tls_config(&self) -> &TlsConfig {
652        self.ctx.tls_config()
653    }
654
655    // ----- Device Detection API -----
656
657    pub fn device_detection_lookup(&self, user_agent: &str) -> Option<String> {
658        self.ctx
659            .device_detection()
660            .lookup(user_agent)
661            .map(|data| data.to_string())
662    }
663
664    // ----- Dictionaries API -----
665
666    /// Look up a dictionary-handle by name.
667    pub fn dictionary_handle(&mut self, name: &str) -> Result<DictionaryHandle, Error> {
668        if let Some(dict) = self.dictionaries().get(name) {
669            let loaded = dict.load().map_err(|err| Error::Other(err.into()))?;
670            Ok(self.loaded_dictionaries.push(loaded))
671        } else {
672            Err(Error::DictionaryError(
673                crate::wiggle_abi::DictionaryError::UnknownDictionary(name.to_owned()),
674            ))
675        }
676    }
677
678    /// Look up a dictionary by dictionary-handle.
679    pub fn dictionary(&self, handle: DictionaryHandle) -> Result<&LoadedDictionary, HandleError> {
680        self.loaded_dictionaries
681            .get(handle)
682            .ok_or(HandleError::InvalidDictionaryHandle(handle))
683    }
684
685    /// Access the dictionary map.
686    pub fn dictionaries(&self) -> &Dictionaries {
687        self.ctx.dictionaries()
688    }
689
690    // ----- Geolocation API -----
691
692    pub fn geolocation_lookup(&self, addr: &IpAddr) -> Option<String> {
693        self.ctx
694            .geolocation()
695            .lookup(addr)
696            .map(|data| data.to_string())
697    }
698
699    // ----- NGWAF Inspect API -----
700
701    /// Retrieve the compliance region that received the request for this session.
702    pub fn ngwaf_response(&self) -> String {
703        format!(
704            r#"{{"waf_response":200,"redirect_url":"","tags":[],"verdict":"{}","decision_ms":0}}"#,
705            self.ngwaf_verdict
706        )
707    }
708
709    // ----- KV Store API -----
710
711    pub fn kv_store(&self) -> &ObjectStores {
712        self.ctx.object_store()
713    }
714
715    pub fn kv_store_handle(&mut self, key: &str) -> KvStoreHandle {
716        let obj_key = ObjectStoreKey::new(key);
717        self.kv_store_by_name.push(obj_key)
718    }
719
720    pub fn get_kv_store_key(&self, handle: KvStoreHandle) -> Option<&ObjectStoreKey> {
721        self.kv_store_by_name.get(handle)
722    }
723
724    pub fn kv_insert(
725        &self,
726        obj_store_key: ObjectStoreKey,
727        obj_key: ObjectKey,
728        obj: Vec<u8>,
729        mode: Option<KvInsertMode>,
730        generation: Option<u64>,
731        metadata: Option<String>,
732        ttl: Option<Duration>,
733    ) -> Result<(), KvStoreError> {
734        let mode = match mode {
735            None => KvInsertMode::Overwrite,
736            Some(m) => m,
737        };
738
739        self.kv_store()
740            .insert(obj_store_key, obj_key, obj, mode, generation, metadata, ttl)
741    }
742
743    /// Insert a [`PendingKvInsert`] into the session.
744    ///
745    /// This method returns a new [`PendingKvInsertHandle`], which can then be used to access
746    /// and mutate the pending insert.
747    pub fn insert_pending_kv_insert(
748        &mut self,
749        pending: PendingKvInsertTask,
750    ) -> KvStoreInsertHandle {
751        self.async_items
752            .push(Some(AsyncItem::PendingKvInsert(pending)))
753            .into()
754    }
755
756    /// Take ownership of a [`PendingKvInsert`], given its [`PendingKvInsertHandle`].
757    ///
758    /// Returns a [`HandleError`] if the handle is not associated with a pending insert in the
759    /// session.
760    pub fn take_pending_kv_insert(
761        &mut self,
762        handle: PendingKvInsertHandle,
763    ) -> Result<PendingKvInsertTask, HandleError> {
764        // check that this is a pending request before removing it
765        let _ = self.pending_kv_insert(handle)?;
766
767        self.async_items
768            .get_mut(handle.into())
769            .and_then(Option::take)
770            .and_then(AsyncItem::into_pending_kv_insert)
771            .ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
772    }
773
774    /// Get a reference to a [`PendingInsert`], given its [`PendingKvInsertHandle`].
775    ///
776    /// Returns a [`HandleError`] if the handle is not associated with a insert in the
777    /// session.
778    pub fn pending_kv_insert(
779        &self,
780        handle: PendingKvInsertHandle,
781    ) -> Result<&PendingKvInsertTask, HandleError> {
782        self.async_items
783            .get(handle.into())
784            .and_then(Option::as_ref)
785            .and_then(AsyncItem::as_pending_kv_insert)
786            .ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
787    }
788
789    pub fn kv_delete(
790        &self,
791        obj_store_key: ObjectStoreKey,
792        obj_key: ObjectKey,
793    ) -> Result<bool, KvStoreError> {
794        self.kv_store().delete(obj_store_key, obj_key)
795    }
796
797    /// Insert a [`PendingKvDelete`] into the session.
798    ///
799    /// This method returns a new [`PendingKvDeleteHandle`], which can then be used to access
800    /// and mutate the pending delete.
801    pub fn insert_pending_kv_delete(
802        &mut self,
803        pending: PendingKvDeleteTask,
804    ) -> PendingKvDeleteHandle {
805        self.async_items
806            .push(Some(AsyncItem::PendingKvDelete(pending)))
807            .into()
808    }
809
810    /// Take ownership of a [`PendingKvDelete`], given its [`PendingKvDeleteHandle`].
811    ///
812    /// Returns a [`HandleError`] if the handle is not associated with a pending delete in the
813    /// session.
814    pub fn take_pending_kv_delete(
815        &mut self,
816        handle: PendingKvDeleteHandle,
817    ) -> Result<PendingKvDeleteTask, HandleError> {
818        // check that this is a pending request before removing it
819        let _ = self.pending_kv_delete(handle)?;
820
821        self.async_items
822            .get_mut(handle.into())
823            .and_then(Option::take)
824            .and_then(AsyncItem::into_pending_kv_delete)
825            .ok_or(HandleError::InvalidPendingKvDeleteHandle(handle))
826    }
827
828    /// Get a reference to a [`PendingDelete`], given its [`PendingKvDeleteHandle`].
829    ///
830    /// Returns a [`HandleError`] if the handle is not associated with a delete in the
831    /// session.
832    pub fn pending_kv_delete(
833        &self,
834        handle: PendingKvDeleteHandle,
835    ) -> Result<&PendingKvDeleteTask, HandleError> {
836        self.async_items
837            .get(handle.into())
838            .and_then(Option::as_ref)
839            .and_then(AsyncItem::as_pending_kv_delete)
840            .ok_or(HandleError::InvalidPendingKvDeleteHandle(handle))
841    }
842
843    pub fn obj_lookup(
844        &self,
845        obj_store_key: ObjectStoreKey,
846        obj_key: ObjectKey,
847    ) -> Result<Option<ObjectValue>, KvStoreError> {
848        self.kv_store().lookup(obj_store_key, obj_key)
849    }
850
851    /// Insert a [`PendingLookup`] into the session.
852    ///
853    /// This method returns a new [`PendingKvLookupHandle`], which can then be used to access
854    /// and mutate the pending lookup.
855    pub fn insert_pending_kv_lookup(
856        &mut self,
857        pending: PendingKvLookupTask,
858    ) -> PendingKvLookupHandle {
859        self.async_items
860            .push(Some(AsyncItem::PendingKvLookup(pending)))
861            .into()
862    }
863
864    /// Take ownership of a [`PendingLookup`], given its [`PendingKvLookupHandle`].
865    ///
866    /// Returns a [`HandleError`] if the handle is not associated with a pending lookup in the
867    /// session.
868    pub fn take_pending_kv_lookup(
869        &mut self,
870        handle: PendingKvLookupHandle,
871    ) -> Result<PendingKvLookupTask, HandleError> {
872        // check that this is a pending request before removing it
873        let _ = self.pending_kv_lookup(handle)?;
874
875        self.async_items
876            .get_mut(handle.into())
877            .and_then(Option::take)
878            .and_then(AsyncItem::into_pending_kv_lookup)
879            .ok_or(HandleError::InvalidPendingKvLookupHandle(handle))
880    }
881
882    /// Get a reference to a [`PendingLookup`], given its [`PendingKvLookupHandle`].
883    ///
884    /// Returns a [`HandleError`] if the handle is not associated with a lookup in the
885    /// session.
886    pub fn pending_kv_lookup(
887        &self,
888        handle: PendingKvLookupHandle,
889    ) -> Result<&PendingKvLookupTask, HandleError> {
890        self.async_items
891            .get(handle.into())
892            .and_then(Option::as_ref)
893            .and_then(AsyncItem::as_pending_kv_lookup)
894            .ok_or(HandleError::InvalidPendingKvLookupHandle(handle))
895    }
896
897    pub fn kv_list(
898        &self,
899        obj_store_key: ObjectStoreKey,
900        cursor: Option<String>,
901        prefix: Option<String>,
902        limit: Option<u32>,
903    ) -> Result<Vec<u8>, KvStoreError> {
904        let limit = limit.unwrap_or(1000);
905
906        self.kv_store().list(obj_store_key, cursor, prefix, limit)
907    }
908
909    /// Insert a [`PendingList`] into the session.
910    ///
911    /// This method returns a new [`PendingKvListHandle`], which can then be used to access
912    /// and mutate the pending list.
913    pub fn insert_pending_kv_list(&mut self, pending: PendingKvListTask) -> PendingKvListHandle {
914        self.async_items
915            .push(Some(AsyncItem::PendingKvList(pending)))
916            .into()
917    }
918
919    /// Take ownership of a [`PendingList`], given its [`PendingKvListHandle`].
920    ///
921    /// Returns a [`HandleError`] if the handle is not associated with a pending list in the
922    /// session.
923    pub fn take_pending_kv_list(
924        &mut self,
925        handle: PendingKvListHandle,
926    ) -> Result<PendingKvListTask, HandleError> {
927        // check that this is a pending request before removing it
928        let _ = self.pending_kv_list(handle)?;
929
930        self.async_items
931            .get_mut(handle.into())
932            .and_then(Option::take)
933            .and_then(AsyncItem::into_pending_kv_list)
934            .ok_or(HandleError::InvalidPendingKvListHandle(handle))
935    }
936
937    /// Get a reference to a [`PendingList`], given its [`PendingKvListHandle`].
938    ///
939    /// Returns a [`HandleError`] if the handle is not associated with a list in the
940    /// session.
941    pub fn pending_kv_list(
942        &self,
943        handle: PendingKvListHandle,
944    ) -> Result<&PendingKvListTask, HandleError> {
945        self.async_items
946            .get(handle.into())
947            .and_then(Option::as_ref)
948            .and_then(AsyncItem::as_pending_kv_list)
949            .ok_or(HandleError::InvalidPendingKvListHandle(handle))
950    }
951
952    // ----- Secret Store API -----
953
954    pub fn secret_store_handle(&mut self, name: &str) -> Option<SecretStoreHandle> {
955        self.secret_stores().get_store(name)?;
956        Some(self.secret_stores_by_name.push(name.to_string()))
957    }
958
959    pub fn secret_store_name(&self, handle: SecretStoreHandle) -> Option<String> {
960        self.secret_stores_by_name.get(handle).cloned()
961    }
962
963    pub fn secret_handle(&mut self, store_name: &str, secret_name: &str) -> Option<SecretHandle> {
964        self.secret_stores()
965            .get_store(store_name)?
966            .get_secret(secret_name)?;
967        Some(self.secrets_by_name.push(SecretLookup::Standard {
968            store_name: store_name.to_string(),
969            secret_name: secret_name.to_string(),
970        }))
971    }
972
973    pub fn secret_lookup(&self, handle: SecretHandle) -> Option<SecretLookup> {
974        self.secrets_by_name.get(handle).cloned()
975    }
976
977    pub fn add_secret(&mut self, plaintext: Vec<u8>) -> SecretHandle {
978        self.secrets_by_name
979            .push(SecretLookup::Injected { plaintext })
980    }
981
982    pub fn secret_stores(&self) -> &SecretStores {
983        self.ctx.secret_stores()
984    }
985
986    // ----- Pending Requests API -----
987
988    /// Insert a [`PendingRequest`] into the session.
989    ///
990    /// This method returns a new [`PendingRequestHandle`], which can then be used to access
991    /// and mutate the pending request.
992    pub fn insert_pending_request(
993        &mut self,
994        pending: PeekableTask<Response<Body>>,
995    ) -> PendingRequestHandle {
996        self.async_items
997            .push(Some(AsyncItem::PendingReq(pending)))
998            .into()
999    }
1000
1001    /// Get a reference to a [`PendingRequest`], given its [`PendingRequestHandle`].
1002    ///
1003    /// Returns a [`HandleError`] if the handle is not associated with a request in the
1004    /// session.
1005    pub fn pending_request(
1006        &self,
1007        handle: PendingRequestHandle,
1008    ) -> Result<&PeekableTask<Response<Body>>, HandleError> {
1009        self.async_items
1010            .get(handle.into())
1011            .and_then(Option::as_ref)
1012            .and_then(AsyncItem::as_pending_req)
1013            .ok_or(HandleError::InvalidPendingRequestHandle(handle))
1014    }
1015
1016    /// Get a mutable reference to a [`PendingRequest`], given its [`PendingRequestHandle`].
1017    ///
1018    /// Returns a [`HandleError`] if the handle is not associated with a request in the
1019    /// session.
1020    pub fn pending_request_mut(
1021        &mut self,
1022        handle: PendingRequestHandle,
1023    ) -> Result<&mut PeekableTask<Response<Body>>, HandleError> {
1024        self.async_items
1025            .get_mut(handle.into())
1026            .and_then(Option::as_mut)
1027            .and_then(AsyncItem::as_pending_req_mut)
1028            .ok_or(HandleError::InvalidPendingRequestHandle(handle))
1029    }
1030
1031    /// Take ownership of a [`PendingRequest`], given its [`PendingRequestHandle`].
1032    ///
1033    /// Returns a [`HandleError`] if the handle is not associated with a pending request in the
1034    /// session.
1035    pub fn take_pending_request(
1036        &mut self,
1037        handle: PendingRequestHandle,
1038    ) -> Result<PeekableTask<Response<Body>>, HandleError> {
1039        // check that this is a pending request before removing it
1040        let _ = self.pending_request(handle)?;
1041
1042        self.async_items
1043            .get_mut(handle.into())
1044            .and_then(Option::take)
1045            .and_then(AsyncItem::into_pending_req)
1046            .ok_or(HandleError::InvalidPendingRequestHandle(handle))
1047    }
1048
1049    pub fn reinsert_pending_request(
1050        &mut self,
1051        handle: PendingRequestHandle,
1052        pending_req: PeekableTask<Response<Body>>,
1053    ) -> Result<(), HandleError> {
1054        *self
1055            .async_items
1056            .get_mut(handle.into())
1057            .ok_or(HandleError::InvalidPendingRequestHandle(handle))? =
1058            Some(AsyncItem::PendingReq(pending_req));
1059        Ok(())
1060    }
1061
1062    // ------- Core Cache API ------
1063
1064    /// Insert a pending cache operation: CacheHandle or CacheBusyHandle
1065    pub fn insert_cache_op(&mut self, task: PendingCacheTask) -> AsyncItemHandle {
1066        self.async_items.push(Some(AsyncItem::PendingCache(task)))
1067    }
1068
1069    /// Get mutable access to a cache entry, which may require blocking until the entry is
1070    /// available.
1071    pub(crate) async fn cache_entry_mut(
1072        &mut self,
1073        handle: CacheHandle,
1074    ) -> Result<&mut CacheEntry, HandleError> {
1075        self.async_items
1076            .get_mut(handle.into())
1077            .and_then(Option::as_mut)
1078            .and_then(AsyncItem::as_pending_cache_mut)
1079            .map(PendingCacheTask::as_mut)
1080            .ok_or(HandleError::InvalidCacheHandle(handle))?
1081            .await
1082            .as_mut()
1083            .map_err(|e| {
1084                tracing::error!("in completion of cache lookup: {e}");
1085                HandleError::InvalidCacheHandle(handle)
1086            })
1087    }
1088
1089    /// Get immutable access to a cache entry, which may require blocking until the entry is
1090    /// available.
1091    pub(crate) async fn cache_entry(
1092        &mut self,
1093        handle: CacheHandle,
1094    ) -> Result<&CacheEntry, HandleError> {
1095        self.async_items
1096            .get_mut(handle.into())
1097            .and_then(Option::as_mut)
1098            .and_then(AsyncItem::as_pending_cache_mut)
1099            .map(PendingCacheTask::as_mut)
1100            .ok_or(HandleError::InvalidCacheHandle(handle))?
1101            .await
1102            .as_ref()
1103            .map_err(|e| {
1104                tracing::error!("in completion of cache lookup: {e}");
1105                HandleError::InvalidCacheHandle(handle)
1106            })
1107    }
1108
1109    /// Take ownership of a `CacheEntry` given its handle.
1110    ///
1111    /// Returns a `HandleError` if the handle is not associated with a cache lookup.
1112    pub(crate) fn take_cache_entry(
1113        &mut self,
1114        handle: CacheHandle,
1115    ) -> Result<PendingCacheTask, HandleError> {
1116        self.async_items
1117            .get_mut(handle.into())
1118            .and_then(Option::take)
1119            .and_then(AsyncItem::into_pending_cache)
1120            .ok_or(HandleError::InvalidCacheHandle(handle))
1121    }
1122
1123    /// Access the cache.
1124    pub fn cache(&self) -> &Arc<Cache> {
1125        self.ctx.cache()
1126    }
1127
1128    // -------- Scheduling APIs ----------
1129
1130    /// Take ownership of multiple AsyncItems in preparation for a `select`.
1131    ///
1132    /// Returns a [`HandleError`] if any of the handles are not associated with a pending
1133    /// request in the session.
1134    pub fn prepare_select_targets(
1135        &mut self,
1136        handles: impl IntoIterator<Item = AsyncItemHandle>,
1137    ) -> Result<Vec<SelectTarget>, HandleError> {
1138        // Prepare a vector of targets from the given handles; if any of the handles are invalid,
1139        // put back all the targets we've extracted so far
1140        let mut targets = vec![];
1141        for handle in handles {
1142            if let Ok(item) = self.take_async_item(handle) {
1143                targets.push(SelectTarget { handle, item });
1144            } else {
1145                self.reinsert_select_targets(targets);
1146                return Err(HandleError::InvalidPendingRequestHandle(handle.into()));
1147            }
1148        }
1149        Ok(targets)
1150    }
1151
1152    /// Put the given vector of `select` targets back into the pending request table, using the handles
1153    /// stored within each [`SelectTarget`].
1154    pub fn reinsert_select_targets(&mut self, targets: Vec<SelectTarget>) {
1155        for target in targets {
1156            self.reinsert_async_handle(target.handle, target.item);
1157        }
1158    }
1159
1160    pub fn reinsert_async_handle(&mut self, handle: AsyncItemHandle, item: AsyncItem) {
1161        // Invalid handle, reinsert the item.
1162        debug_assert!(self.async_items[handle].is_none());
1163        self.async_items[handle] = Some(item);
1164    }
1165
1166    pub fn new_ready(&mut self) -> AsyncItemHandle {
1167        self.async_items.push(Some(AsyncItem::Ready))
1168    }
1169
1170    /// Returns the unique identifier for the current session.
1171    ///
1172    /// While this corresponds to the request ID for the initial request that spawned
1173    /// the session, subsequent downstream requests received by the session will have
1174    /// their own unique identifier. Care should be taken to not conflate the two, and
1175    /// to use [Session::downstream_request_id] whenever a request needs to be identified.
1176    pub fn session_id(&self) -> u64 {
1177        self.session_id
1178    }
1179
1180    /// Access the path to the configuration file for this invocation.
1181    pub fn config_path(&self) -> Option<&Path> {
1182        self.ctx.config_path()
1183    }
1184
1185    pub fn async_item_mut(
1186        &mut self,
1187        handle: AsyncItemHandle,
1188    ) -> Result<&mut AsyncItem, HandleError> {
1189        match self.async_items.get_mut(handle).and_then(|ai| ai.as_mut()) {
1190            Some(item) => Ok(item),
1191            None => Err(HandleError::InvalidAsyncItemHandle(handle.into()))?,
1192        }
1193    }
1194
1195    pub fn take_async_item(&mut self, handle: AsyncItemHandle) -> Result<AsyncItem, HandleError> {
1196        // check that this is an async item before removing it
1197        let _ = self.async_item_mut(handle)?;
1198
1199        let item = self
1200            .async_items
1201            .get_mut(handle)
1202            .and_then(|tracked| tracked.take())
1203            .ok_or(HandleError::InvalidAsyncItemHandle(handle.into()))?;
1204
1205        // We just took the handle out of the table, so if it was "the"
1206        // downstream pending handle, it no longer is.
1207        if let AsyncItem::PendingDownstream(_) = item {
1208            self.downstream_pending_handle = None;
1209        }
1210
1211        Ok(item)
1212    }
1213
1214    pub async fn select_impl(
1215        &mut self,
1216        handles: impl IntoIterator<Item = AsyncItemHandle>,
1217    ) -> Result<usize, Error> {
1218        // we have to temporarily move the async items out of the session table,
1219        // because we need &mut borrows of all of them simultaneously.
1220        let targets = self.prepare_select_targets(handles)?;
1221        let mut selected = SelectedTargets::new(self, targets);
1222        let done_index = selected.future().await;
1223
1224        Ok(done_index)
1225    }
1226
1227    pub fn shielding_sites(&self) -> &ShieldingSites {
1228        self.ctx.shielding_sites()
1229    }
1230
1231    pub async fn register_pending_downstream_req(
1232        &mut self,
1233        timeout: Option<Duration>,
1234    ) -> Result<AsyncItemHandle, Error> {
1235        if self.downstream_pending_handle.is_some() {
1236            return Err(Error::LimitExceeded {
1237                msg: "Too many pending downstream request handles have been created",
1238            });
1239        }
1240
1241        let rx = if self.next_req_accepted < NEXT_REQ_ACCEPT_MAX {
1242            self.ctx.register_pending_downstream().await
1243        } else {
1244            None
1245        };
1246
1247        if rx.is_none() {
1248            self.next_req_accepted = NEXT_REQ_ACCEPT_MAX;
1249        } else {
1250            self.next_req_accepted += 1;
1251        }
1252
1253        let timeout = timeout.unwrap_or(NEXT_REQ_TIMEOUT).min(NEXT_REQ_TIMEOUT);
1254        let task = PendingDownstreamReqTask::new(rx, timeout);
1255        let handle = self.async_items.push(Some(AsyncItem::from(task)));
1256        self.downstream_pending_handle = Some(handle);
1257
1258        Ok(handle)
1259    }
1260
1261    pub fn take_pending_downstream_req(
1262        &mut self,
1263        handle: AsyncItemHandle,
1264    ) -> Result<PendingDownstreamReqTask, HandleError> {
1265        let task = self
1266            .async_items
1267            .get_mut(handle)
1268            .and_then(|maybe_item| {
1269                if maybe_item
1270                    .as_mut()
1271                    .and_then(AsyncItem::as_pending_downstream_req_mut)
1272                    .is_some()
1273                {
1274                    maybe_item
1275                        .take()
1276                        .and_then(AsyncItem::into_pending_downstream_req)
1277                } else {
1278                    None
1279                }
1280            })
1281            .ok_or_else(|| HandleError::InvalidPendingDownstreamHandle(handle.into()))?;
1282
1283        self.downstream_pending_handle = None;
1284
1285        Ok(task)
1286    }
1287
1288    /// Wait for a [PendingDownstreamReqTask] to finish, then fetch its request and body handles.
1289    pub async fn await_downstream_req(
1290        &mut self,
1291        handle: AsyncItemHandle,
1292    ) -> Result<Option<(RequestHandle, BodyHandle)>, Error> {
1293        if self.downstream_resp.is_unsent() {
1294            return Err(Error::Unsupported {
1295                msg: "cannot accept requests w/o handling the outstanding one",
1296            });
1297        }
1298
1299        let item = self.take_pending_downstream_req(handle)?;
1300        let Some(downstream) = item.recv().await? else {
1301            return Ok(None);
1302        };
1303
1304        let (parts, body) = downstream.req.into_parts();
1305        let body_handle = self.async_items.push(Some(AsyncItem::Body(body)));
1306        let req_handle = self.req_parts.push(RequestParts {
1307            parts: Some(parts),
1308            metadata: Some(downstream.metadata),
1309        });
1310
1311        self.downstream_resp = DownstreamResponseState::new(downstream.sender);
1312        self.downstream_req_handle = req_handle;
1313        self.downstream_req_body_handle = body_handle.into();
1314
1315        Ok(Some((req_handle, body_handle.into())))
1316    }
1317
1318    pub fn abandon_pending_downstream_req(
1319        &mut self,
1320        handle: AsyncItemHandle,
1321    ) -> Result<(), HandleError> {
1322        self.take_pending_downstream_req(handle).map(|_| ())
1323    }
1324
1325    pub fn ctx(&self) -> &Arc<ExecuteCtx> {
1326        &self.ctx
1327    }
1328}
1329
1330pub struct SelectedTargets<'session> {
1331    session: &'session mut Session,
1332    targets: Vec<SelectTarget>,
1333}
1334
1335impl<'session> SelectedTargets<'session> {
1336    fn new(session: &'session mut Session, targets: Vec<SelectTarget>) -> Self {
1337        Self { session, targets }
1338    }
1339
1340    fn future(&mut self) -> Box<dyn Future<Output = usize> + Unpin + Send + Sync + '_> {
1341        // for each target, we produce a future for checking on the "readiness"
1342        // of the associated primary I/O operation
1343        let mut futures = Vec::new();
1344        for target in &mut *self.targets {
1345            futures.push(Box::pin(target.item.await_ready()))
1346        }
1347        if futures.is_empty() {
1348            // if there are no futures, we wait forever; this waiting will always be bounded by a timeout,
1349            // since the `select` hostcall requires a timeout when no handles are given.
1350            Box::new(future::pending())
1351        } else {
1352            Box::new(future::select_all(futures).map(|f| f.1))
1353        }
1354    }
1355}
1356
1357impl<'session> Drop for SelectedTargets<'session> {
1358    fn drop(&mut self) {
1359        let targets = std::mem::take(&mut self.targets);
1360        self.session.reinsert_select_targets(targets);
1361    }
1362}
1363
1364/// Additional Viceroy-specific metadata for requests.
1365#[derive(Clone, Debug)]
1366pub struct ViceroyRequestMetadata {
1367    pub auto_decompress_encodings: ContentEncodings,
1368    pub framing_headers_mode: FramingHeadersMode,
1369}
1370
1371impl Default for ViceroyRequestMetadata {
1372    fn default() -> Self {
1373        ViceroyRequestMetadata {
1374            auto_decompress_encodings: ContentEncodings::empty(),
1375            framing_headers_mode: FramingHeadersMode::Automatic,
1376        }
1377    }
1378}
1379
1380/// Additional Viceroy-specific metadata for responses.
1381#[derive(Clone, Debug)]
1382pub struct ViceroyResponseMetadata {
1383    pub framing_headers_mode: FramingHeadersMode,
1384}
1385
1386impl Default for ViceroyResponseMetadata {
1387    fn default() -> Self {
1388        ViceroyResponseMetadata {
1389            framing_headers_mode: FramingHeadersMode::Automatic,
1390        }
1391    }
1392}
1393
1394#[derive(Clone, Copy, Eq, Hash, PartialEq)]
1395#[repr(transparent)]
1396pub struct AsyncItemHandle(u32);
1397
1398entity_impl!(AsyncItemHandle, "async_item");
1399
1400// The ABI uses distinct entity types for each kind of async item because most host calls operate on
1401// only one type at a type. But the underlying tables for all async items are combined, so the handles
1402// are interchangeable. Keeping them as separate types helps ensure intentional view shifts between
1403// them, using `.into()`.
1404
1405impl From<BodyHandle> for AsyncItemHandle {
1406    fn from(h: BodyHandle) -> AsyncItemHandle {
1407        AsyncItemHandle::from_u32(h.into())
1408    }
1409}
1410
1411impl From<AsyncItemHandle> for BodyHandle {
1412    fn from(h: AsyncItemHandle) -> BodyHandle {
1413        BodyHandle::from(h.as_u32())
1414    }
1415}
1416
1417impl From<PendingRequestHandle> for AsyncItemHandle {
1418    fn from(h: PendingRequestHandle) -> AsyncItemHandle {
1419        AsyncItemHandle::from_u32(h.into())
1420    }
1421}
1422
1423impl From<AsyncItemHandle> for PendingRequestHandle {
1424    fn from(h: AsyncItemHandle) -> PendingRequestHandle {
1425        PendingRequestHandle::from(h.as_u32())
1426    }
1427}
1428
1429impl From<types::AsyncItemHandle> for AsyncItemHandle {
1430    fn from(h: types::AsyncItemHandle) -> AsyncItemHandle {
1431        AsyncItemHandle::from_u32(h.into())
1432    }
1433}
1434
1435impl From<AsyncItemHandle> for types::AsyncItemHandle {
1436    fn from(h: AsyncItemHandle) -> types::AsyncItemHandle {
1437        types::AsyncItemHandle::from(h.as_u32())
1438    }
1439}
1440
1441impl From<PendingKvLookupHandle> for AsyncItemHandle {
1442    fn from(h: PendingKvLookupHandle) -> AsyncItemHandle {
1443        AsyncItemHandle::from_u32(h.into())
1444    }
1445}
1446
1447impl From<AsyncItemHandle> for PendingKvLookupHandle {
1448    fn from(h: AsyncItemHandle) -> PendingKvLookupHandle {
1449        PendingKvLookupHandle::from(h.as_u32())
1450    }
1451}
1452
1453impl From<PendingKvInsertHandle> for AsyncItemHandle {
1454    fn from(h: PendingKvInsertHandle) -> AsyncItemHandle {
1455        AsyncItemHandle::from_u32(h.into())
1456    }
1457}
1458
1459impl From<AsyncItemHandle> for PendingKvInsertHandle {
1460    fn from(h: AsyncItemHandle) -> PendingKvInsertHandle {
1461        PendingKvInsertHandle::from(h.as_u32())
1462    }
1463}
1464
1465impl From<PendingKvDeleteHandle> for AsyncItemHandle {
1466    fn from(h: PendingKvDeleteHandle) -> AsyncItemHandle {
1467        AsyncItemHandle::from_u32(h.into())
1468    }
1469}
1470
1471impl From<AsyncItemHandle> for PendingKvDeleteHandle {
1472    fn from(h: AsyncItemHandle) -> PendingKvDeleteHandle {
1473        PendingKvDeleteHandle::from(h.as_u32())
1474    }
1475}
1476
1477impl From<PendingKvListHandle> for AsyncItemHandle {
1478    fn from(h: PendingKvListHandle) -> AsyncItemHandle {
1479        AsyncItemHandle::from_u32(h.into())
1480    }
1481}
1482
1483impl From<AsyncItemHandle> for PendingKvListHandle {
1484    fn from(h: AsyncItemHandle) -> PendingKvListHandle {
1485        PendingKvListHandle::from(h.as_u32())
1486    }
1487}
1488
1489impl From<KvStoreLookupHandle> for AsyncItemHandle {
1490    fn from(h: KvStoreLookupHandle) -> AsyncItemHandle {
1491        AsyncItemHandle::from_u32(h.into())
1492    }
1493}
1494
1495impl From<AsyncItemHandle> for KvStoreLookupHandle {
1496    fn from(h: AsyncItemHandle) -> KvStoreLookupHandle {
1497        KvStoreLookupHandle::from(h.as_u32())
1498    }
1499}
1500
1501impl From<KvStoreInsertHandle> for AsyncItemHandle {
1502    fn from(h: KvStoreInsertHandle) -> AsyncItemHandle {
1503        AsyncItemHandle::from_u32(h.into())
1504    }
1505}
1506
1507impl From<AsyncItemHandle> for KvStoreInsertHandle {
1508    fn from(h: AsyncItemHandle) -> KvStoreInsertHandle {
1509        KvStoreInsertHandle::from(h.as_u32())
1510    }
1511}
1512
1513impl From<KvStoreDeleteHandle> for AsyncItemHandle {
1514    fn from(h: KvStoreDeleteHandle) -> AsyncItemHandle {
1515        AsyncItemHandle::from_u32(h.into())
1516    }
1517}
1518
1519impl From<AsyncItemHandle> for KvStoreDeleteHandle {
1520    fn from(h: AsyncItemHandle) -> KvStoreDeleteHandle {
1521        KvStoreDeleteHandle::from(h.as_u32())
1522    }
1523}
1524
1525impl From<KvStoreListHandle> for AsyncItemHandle {
1526    fn from(h: KvStoreListHandle) -> AsyncItemHandle {
1527        AsyncItemHandle::from_u32(h.into())
1528    }
1529}
1530
1531impl From<AsyncItemHandle> for KvStoreListHandle {
1532    fn from(h: AsyncItemHandle) -> KvStoreListHandle {
1533        KvStoreListHandle::from(h.as_u32())
1534    }
1535}
1536
1537impl From<AsyncItemHandle> for CacheHandle {
1538    fn from(h: AsyncItemHandle) -> CacheHandle {
1539        CacheHandle::from(h.as_u32())
1540    }
1541}
1542
1543impl From<CacheHandle> for AsyncItemHandle {
1544    fn from(h: CacheHandle) -> AsyncItemHandle {
1545        AsyncItemHandle::from_u32(h.into())
1546    }
1547}
1548
1549impl From<AsyncItemHandle> for CacheBusyHandle {
1550    fn from(h: AsyncItemHandle) -> CacheBusyHandle {
1551        CacheBusyHandle::from(h.as_u32())
1552    }
1553}
1554
1555impl From<CacheBusyHandle> for AsyncItemHandle {
1556    fn from(h: CacheBusyHandle) -> AsyncItemHandle {
1557        AsyncItemHandle::from_u32(h.into())
1558    }
1559}
1560
1561impl From<AsyncItemHandle> for RequestPromiseHandle {
1562    fn from(h: AsyncItemHandle) -> RequestPromiseHandle {
1563        RequestPromiseHandle::from(h.as_u32())
1564    }
1565}
1566
1567impl From<RequestPromiseHandle> for AsyncItemHandle {
1568    fn from(h: RequestPromiseHandle) -> AsyncItemHandle {
1569        AsyncItemHandle::from_u32(h.into())
1570    }
1571}
1572
1573// CacheBusyHandle and CacheHandle are equivalent; CacheHandle is just a "later" resolution.
1574impl From<CacheBusyHandle> for CacheHandle {
1575    fn from(h: CacheBusyHandle) -> CacheHandle {
1576        let raw: u32 = h.into();
1577        CacheHandle::from(raw)
1578    }
1579}