1mod async_item;
4mod downstream;
5
6pub use async_item::{
7 AsyncItem, PeekableTask, PendingCacheTask, PendingDownstreamReqTask, PendingKvDeleteTask,
8 PendingKvInsertTask, PendingKvListTask, PendingKvLookupTask,
9};
10
11use std::collections::HashMap;
12use std::future::Future;
13use std::io::Write;
14use std::net::IpAddr;
15use std::path::Path;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use crate::cache::{Cache, CacheEntry};
21use crate::object_store::KvStoreError;
22use crate::wiggle_abi::types::{CacheBusyHandle, CacheHandle, FramingHeadersMode};
23
24use {
25 self::downstream::DownstreamResponseState,
26 crate::{
27 acl::Acl,
28 body::Body,
29 config::{Backend, Backends, Dictionaries, LoadedDictionary},
30 downstream::{DownstreamMetadata, DownstreamRequest},
31 error::{Error, HandleError},
32 logging::LogEndpoint,
33 object_store::{ObjectKey, ObjectStoreKey, ObjectStores, ObjectValue},
34 pushpin::PushpinRedirectInfo,
35 secret_store::{SecretLookup, SecretStores},
36 shielding_site::ShieldingSites,
37 streaming_body::StreamingBody,
38 upstream::{SelectTarget, TlsConfig},
39 wiggle_abi::types::{
40 self, AclHandle, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle,
41 KvInsertMode, KvStoreDeleteHandle, KvStoreHandle, KvStoreInsertHandle,
42 KvStoreListHandle, KvStoreLookupHandle, PendingKvDeleteHandle, PendingKvInsertHandle,
43 PendingKvListHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle,
44 RequestPromiseHandle, ResponseHandle, SecretHandle, SecretStoreHandle,
45 },
46 ExecuteCtx,
47 },
48 cranelift_entity::{entity_impl, PrimaryMap},
49 futures::future::{self, FutureExt},
50 http::{request, response, HeaderMap, Response},
51};
52
53const NEXT_REQ_ACCEPT_MAX: usize = 5;
54const NEXT_REQ_TIMEOUT: Duration = Duration::from_secs(10);
55const NGWAF_ALLOW_VERDICT: &str = "allow";
56
57pub struct RequestParts {
58 parts: Option<request::Parts>,
59 metadata: Option<DownstreamMetadata>,
60}
61
62pub struct Session {
65 session_id: u64,
66 pub active_cpu_time_us: Arc<AtomicU64>,
68 downstream_req_handle: RequestHandle,
71 downstream_req_body_handle: BodyHandle,
74 downstream_resp: DownstreamResponseState,
78 downstream_pending_handle: Option<AsyncItemHandle>,
80 async_items: PrimaryMap<AsyncItemHandle, Option<AsyncItem>>,
83 ctx: Arc<ExecuteCtx>,
85 req_parts: PrimaryMap<RequestHandle, RequestParts>,
90 resp_parts: PrimaryMap<ResponseHandle, Option<response::Parts>>,
95 capture_logs: Arc<Mutex<dyn Write + Send>>,
97 log_endpoints: PrimaryMap<EndpointHandle, LogEndpoint>,
99 log_endpoints_by_name: HashMap<Vec<u8>, EndpointHandle>,
101 acl_handles: PrimaryMap<AclHandle, Arc<Acl>>,
103 ngwaf_verdict: String,
105 dynamic_backends: Backends,
109 loaded_dictionaries: PrimaryMap<DictionaryHandle, LoadedDictionary>,
111 kv_store_by_name: PrimaryMap<KvStoreHandle, ObjectStoreKey>,
115 secret_stores_by_name: PrimaryMap<SecretStoreHandle, String>,
119 secrets_by_name: PrimaryMap<SecretHandle, SecretLookup>,
123 next_req_accepted: usize,
125}
126
127impl Session {
128 #[allow(clippy::too_many_arguments)]
130 pub fn new(
131 downstream: DownstreamRequest,
132 active_cpu_time_us: Arc<AtomicU64>,
133 ctx: Arc<ExecuteCtx>,
134 ) -> Session {
135 let (parts, body) = downstream.req.into_parts();
136
137 let mut async_items: PrimaryMap<AsyncItemHandle, Option<AsyncItem>> = PrimaryMap::new();
138 let mut req_parts = PrimaryMap::new();
139
140 let session_id = downstream.metadata.req_id;
141 let downstream_req_handle = req_parts.push(RequestParts {
142 parts: Some(parts),
143 metadata: Some(downstream.metadata),
144 });
145 let downstream_req_body_handle = async_items.push(Some(AsyncItem::Body(body))).into();
146
147 Session {
148 session_id,
149 downstream_req_handle,
150 downstream_req_body_handle,
151 active_cpu_time_us,
152 async_items,
153 req_parts,
154 resp_parts: PrimaryMap::new(),
155 downstream_resp: DownstreamResponseState::new(downstream.sender),
156 capture_logs: ctx.capture_logs(),
157 log_endpoints: PrimaryMap::new(),
158 log_endpoints_by_name: HashMap::new(),
159 acl_handles: PrimaryMap::new(),
160 ngwaf_verdict: NGWAF_ALLOW_VERDICT.to_string(),
161 dynamic_backends: Backends::default(),
162 loaded_dictionaries: PrimaryMap::new(),
163 kv_store_by_name: PrimaryMap::new(),
164 secret_stores_by_name: PrimaryMap::new(),
165 secrets_by_name: PrimaryMap::new(),
166 downstream_pending_handle: None,
167 next_req_accepted: 0,
168
169 ctx,
170 }
171 }
172
173 pub fn downstream_metadata(
177 &self,
178 handle: RequestHandle,
179 ) -> Result<Option<&DownstreamMetadata>, HandleError> {
180 self.req_parts
181 .get(handle)
182 .ok_or(HandleError::InvalidRequestHandle(handle))
183 .map(|r| r.metadata.as_ref())
184 }
185
186 pub fn downstream_client_ip(
188 &self,
189 handle: RequestHandle,
190 ) -> Result<Option<IpAddr>, HandleError> {
191 Ok(self
192 .downstream_metadata(handle)?
193 .map(|md| md.client_addr.ip()))
194 }
195
196 pub fn downstream_server_ip(
198 &self,
199 handle: RequestHandle,
200 ) -> Result<Option<IpAddr>, HandleError> {
201 Ok(self
202 .downstream_metadata(handle)?
203 .map(|md| md.server_addr.ip()))
204 }
205
206 pub fn downstream_compliance_region(
208 &self,
209 handle: RequestHandle,
210 ) -> Result<Option<&str>, HandleError> {
211 Ok(self
212 .downstream_metadata(handle)?
213 .map(|md| md.compliance_region.as_str()))
214 }
215
216 pub fn downstream_request_id(&self, handle: RequestHandle) -> Result<Option<u64>, HandleError> {
218 Ok(self.downstream_metadata(handle)?.map(|md| md.req_id))
219 }
220
221 pub fn downstream_request(&self) -> RequestHandle {
223 self.downstream_req_handle
224 }
225
226 pub fn downstream_request_body(&self) -> BodyHandle {
228 self.downstream_req_body_handle
229 }
230
231 pub fn downstream_original_headers(
233 &self,
234 handle: RequestHandle,
235 ) -> Result<Option<&HeaderMap>, HandleError> {
236 Ok(self
237 .downstream_metadata(handle)?
238 .map(|md| &md.original_headers))
239 }
240
241 pub fn send_downstream_response(&mut self, resp: Response<Body>) -> Result<(), Error> {
252 self.downstream_resp.send(resp)
253 }
254
255 pub fn redirect_downstream_to_pushpin(
264 &mut self,
265 redirect_info: PushpinRedirectInfo,
266 ) -> Result<(), Error> {
267 self.downstream_resp.redirect_to_pushpin(redirect_info)
268 }
269
270 pub fn close_downstream_response_sender(&mut self, resp: Response<Body>) {
273 let _ = self.downstream_resp.send(resp);
274 }
275
276 pub fn insert_body(&mut self, body: Body) -> BodyHandle {
286 self.async_items.push(Some(AsyncItem::Body(body))).into()
287 }
288
289 pub fn body(&self, handle: BodyHandle) -> Result<&Body, HandleError> {
297 self.async_items
298 .get(handle.into())
299 .and_then(Option::as_ref)
300 .and_then(AsyncItem::as_body)
301 .ok_or(HandleError::InvalidBodyHandle(handle))
302 }
303
304 pub fn body_mut(&mut self, handle: BodyHandle) -> Result<&mut Body, HandleError> {
312 self.async_items
313 .get_mut(handle.into())
314 .and_then(Option::as_mut)
315 .and_then(AsyncItem::as_body_mut)
316 .ok_or(HandleError::InvalidBodyHandle(handle))
317 }
318
319 pub fn take_body(&mut self, handle: BodyHandle) -> Result<Body, HandleError> {
327 self.async_items
328 .get_mut(handle.into())
329 .and_then(Option::take)
330 .and_then(AsyncItem::into_body)
331 .ok_or(HandleError::InvalidBodyHandle(handle))
332 }
333
334 pub fn drop_body(&mut self, handle: BodyHandle) -> Result<(), HandleError> {
338 self.async_items
339 .get_mut(handle.into())
340 .and_then(Option::take)
341 .map(drop)
342 .ok_or(HandleError::InvalidBodyHandle(handle))
343 }
344
345 pub fn begin_streaming(&mut self, handle: BodyHandle) -> Result<Body, HandleError> {
353 self.async_items
354 .get_mut(handle.into())
355 .and_then(Option::as_mut)
356 .and_then(AsyncItem::begin_streaming)
357 .ok_or(HandleError::InvalidBodyHandle(handle))
358 }
359
360 pub fn is_streaming_body(&self, handle: BodyHandle) -> bool {
365 if let Some(Some(body)) = self.async_items.get(handle.into()) {
366 body.is_streaming()
367 } else {
368 false
369 }
370 }
371
372 pub fn streaming_body_mut(
382 &mut self,
383 handle: BodyHandle,
384 ) -> Result<&mut StreamingBody, HandleError> {
385 self.async_items
386 .get_mut(handle.into())
387 .and_then(Option::as_mut)
388 .and_then(AsyncItem::as_streaming_mut)
389 .ok_or(HandleError::InvalidBodyHandle(handle))
390 }
391
392 pub fn take_streaming_body(
402 &mut self,
403 handle: BodyHandle,
404 ) -> Result<StreamingBody, HandleError> {
405 self.async_items
406 .get_mut(handle.into())
407 .and_then(Option::take)
408 .and_then(AsyncItem::into_streaming)
409 .ok_or(HandleError::InvalidBodyHandle(handle))
410 }
411
412 pub fn insert_request_parts(&mut self, parts: request::Parts) -> RequestHandle {
423 self.req_parts.push(RequestParts {
424 parts: Some(parts),
425 metadata: None,
426 })
427 }
428
429 pub fn request_parts(&self, handle: RequestHandle) -> Result<&request::Parts, HandleError> {
440 self.req_parts
441 .get(handle)
442 .and_then(|r| r.parts.as_ref())
443 .ok_or(HandleError::InvalidRequestHandle(handle))
444 }
445
446 pub fn request_parts_mut(
457 &mut self,
458 handle: RequestHandle,
459 ) -> Result<&mut request::Parts, HandleError> {
460 self.req_parts
461 .get_mut(handle)
462 .and_then(|r| r.parts.as_mut())
463 .ok_or(HandleError::InvalidRequestHandle(handle))
464 }
465
466 pub fn take_request_parts(
477 &mut self,
478 handle: RequestHandle,
479 ) -> Result<request::Parts, HandleError> {
480 self.req_parts
481 .get_mut(handle)
482 .and_then(|r| r.parts.take())
483 .ok_or(HandleError::InvalidRequestHandle(handle))
484 }
485
486 pub fn insert_response_parts(&mut self, parts: response::Parts) -> ResponseHandle {
497 self.resp_parts.push(Some(parts))
498 }
499
500 pub fn response_parts(&self, handle: ResponseHandle) -> Result<&response::Parts, HandleError> {
511 self.resp_parts
512 .get(handle)
513 .and_then(Option::as_ref)
514 .ok_or(HandleError::InvalidResponseHandle(handle))
515 }
516
517 pub fn response_parts_mut(
528 &mut self,
529 handle: ResponseHandle,
530 ) -> Result<&mut response::Parts, HandleError> {
531 self.resp_parts
532 .get_mut(handle)
533 .and_then(Option::as_mut)
534 .ok_or(HandleError::InvalidResponseHandle(handle))
535 }
536
537 pub fn take_response_parts(
548 &mut self,
549 handle: ResponseHandle,
550 ) -> Result<response::Parts, HandleError> {
551 self.resp_parts
552 .get_mut(handle)
553 .and_then(Option::take)
554 .ok_or(HandleError::InvalidResponseHandle(handle))
555 }
556
557 pub fn insert_response(&mut self, resp: Response<Body>) -> (ResponseHandle, BodyHandle) {
558 let (resp_parts, resp_body) = resp.into_parts();
559 let resp_handle = self.insert_response_parts(resp_parts);
560 let body_handle = self.insert_body(resp_body);
561 (resp_handle, body_handle)
562 }
563
564 pub fn log_endpoint_handle(&mut self, name: &[u8]) -> EndpointHandle {
573 if let Some(handle) = self.log_endpoints_by_name.get(name).copied() {
574 return handle;
575 }
576 let endpoint = LogEndpoint::new(name, self.capture_logs.clone());
577 let handle = self.log_endpoints.push(endpoint);
578 self.log_endpoints_by_name.insert(name.to_owned(), handle);
579 handle
580 }
581
582 pub fn log_endpoint(&self, handle: EndpointHandle) -> Result<&LogEndpoint, HandleError> {
591 self.log_endpoints
592 .get(handle)
593 .ok_or(HandleError::InvalidEndpointHandle(handle))
594 }
595
596 pub fn acl_handle_by_name(&mut self, name: &str) -> Option<AclHandle> {
599 let acl = self.ctx.acls().get_acl(name)?;
600 Some(self.acl_handles.push(acl.clone()))
601 }
602
603 pub fn acl_by_handle(&self, handle: AclHandle) -> Option<Arc<Acl>> {
604 self.acl_handles.get(handle).map(Arc::clone)
605 }
606
607 pub fn backends(&self) -> &Backends {
611 self.ctx.backends()
612 }
613
614 pub fn backend(&self, name: &str) -> Option<&Arc<Backend>> {
616 self.backends()
619 .get(name)
620 .or_else(|| self.dynamic_backends.get(name))
621 }
622
623 pub fn dynamic_backend(&self, name: &str) -> Option<&Arc<Backend>> {
625 self.dynamic_backends.get(name)
626 }
627
628 pub fn backend_names(&self) -> impl Iterator<Item = &String> {
630 self.backends().keys().chain(self.dynamic_backends.keys())
631 }
632
633 pub fn add_backend(&mut self, name: &str, info: Backend) -> bool {
637 if self.backends().contains_key(name) || self.dynamic_backends.contains_key(name) {
639 return false;
640 }
641
642 self.dynamic_backends
643 .insert(name.to_string(), Arc::new(info));
644
645 true
646 }
647
648 pub fn tls_config(&self) -> &TlsConfig {
652 self.ctx.tls_config()
653 }
654
655 pub fn device_detection_lookup(&self, user_agent: &str) -> Option<String> {
658 self.ctx
659 .device_detection()
660 .lookup(user_agent)
661 .map(|data| data.to_string())
662 }
663
664 pub fn dictionary_handle(&mut self, name: &str) -> Result<DictionaryHandle, Error> {
668 if let Some(dict) = self.dictionaries().get(name) {
669 let loaded = dict.load().map_err(|err| Error::Other(err.into()))?;
670 Ok(self.loaded_dictionaries.push(loaded))
671 } else {
672 Err(Error::DictionaryError(
673 crate::wiggle_abi::DictionaryError::UnknownDictionary(name.to_owned()),
674 ))
675 }
676 }
677
678 pub fn dictionary(&self, handle: DictionaryHandle) -> Result<&LoadedDictionary, HandleError> {
680 self.loaded_dictionaries
681 .get(handle)
682 .ok_or(HandleError::InvalidDictionaryHandle(handle))
683 }
684
685 pub fn dictionaries(&self) -> &Dictionaries {
687 self.ctx.dictionaries()
688 }
689
690 pub fn geolocation_lookup(&self, addr: &IpAddr) -> Option<String> {
693 self.ctx
694 .geolocation()
695 .lookup(addr)
696 .map(|data| data.to_string())
697 }
698
699 pub fn ngwaf_response(&self) -> String {
703 format!(
704 r#"{{"waf_response":200,"redirect_url":"","tags":[],"verdict":"{}","decision_ms":0}}"#,
705 self.ngwaf_verdict
706 )
707 }
708
709 pub fn kv_store(&self) -> &ObjectStores {
712 self.ctx.object_store()
713 }
714
715 pub fn kv_store_handle(&mut self, key: &str) -> KvStoreHandle {
716 let obj_key = ObjectStoreKey::new(key);
717 self.kv_store_by_name.push(obj_key)
718 }
719
720 pub fn get_kv_store_key(&self, handle: KvStoreHandle) -> Option<&ObjectStoreKey> {
721 self.kv_store_by_name.get(handle)
722 }
723
724 pub fn kv_insert(
725 &self,
726 obj_store_key: ObjectStoreKey,
727 obj_key: ObjectKey,
728 obj: Vec<u8>,
729 mode: Option<KvInsertMode>,
730 generation: Option<u64>,
731 metadata: Option<String>,
732 ttl: Option<Duration>,
733 ) -> Result<(), KvStoreError> {
734 let mode = match mode {
735 None => KvInsertMode::Overwrite,
736 Some(m) => m,
737 };
738
739 self.kv_store()
740 .insert(obj_store_key, obj_key, obj, mode, generation, metadata, ttl)
741 }
742
743 pub fn insert_pending_kv_insert(
748 &mut self,
749 pending: PendingKvInsertTask,
750 ) -> KvStoreInsertHandle {
751 self.async_items
752 .push(Some(AsyncItem::PendingKvInsert(pending)))
753 .into()
754 }
755
756 pub fn take_pending_kv_insert(
761 &mut self,
762 handle: PendingKvInsertHandle,
763 ) -> Result<PendingKvInsertTask, HandleError> {
764 let _ = self.pending_kv_insert(handle)?;
766
767 self.async_items
768 .get_mut(handle.into())
769 .and_then(Option::take)
770 .and_then(AsyncItem::into_pending_kv_insert)
771 .ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
772 }
773
774 pub fn pending_kv_insert(
779 &self,
780 handle: PendingKvInsertHandle,
781 ) -> Result<&PendingKvInsertTask, HandleError> {
782 self.async_items
783 .get(handle.into())
784 .and_then(Option::as_ref)
785 .and_then(AsyncItem::as_pending_kv_insert)
786 .ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
787 }
788
789 pub fn kv_delete(
790 &self,
791 obj_store_key: ObjectStoreKey,
792 obj_key: ObjectKey,
793 ) -> Result<bool, KvStoreError> {
794 self.kv_store().delete(obj_store_key, obj_key)
795 }
796
797 pub fn insert_pending_kv_delete(
802 &mut self,
803 pending: PendingKvDeleteTask,
804 ) -> PendingKvDeleteHandle {
805 self.async_items
806 .push(Some(AsyncItem::PendingKvDelete(pending)))
807 .into()
808 }
809
810 pub fn take_pending_kv_delete(
815 &mut self,
816 handle: PendingKvDeleteHandle,
817 ) -> Result<PendingKvDeleteTask, HandleError> {
818 let _ = self.pending_kv_delete(handle)?;
820
821 self.async_items
822 .get_mut(handle.into())
823 .and_then(Option::take)
824 .and_then(AsyncItem::into_pending_kv_delete)
825 .ok_or(HandleError::InvalidPendingKvDeleteHandle(handle))
826 }
827
828 pub fn pending_kv_delete(
833 &self,
834 handle: PendingKvDeleteHandle,
835 ) -> Result<&PendingKvDeleteTask, HandleError> {
836 self.async_items
837 .get(handle.into())
838 .and_then(Option::as_ref)
839 .and_then(AsyncItem::as_pending_kv_delete)
840 .ok_or(HandleError::InvalidPendingKvDeleteHandle(handle))
841 }
842
843 pub fn obj_lookup(
844 &self,
845 obj_store_key: ObjectStoreKey,
846 obj_key: ObjectKey,
847 ) -> Result<Option<ObjectValue>, KvStoreError> {
848 self.kv_store().lookup(obj_store_key, obj_key)
849 }
850
851 pub fn insert_pending_kv_lookup(
856 &mut self,
857 pending: PendingKvLookupTask,
858 ) -> PendingKvLookupHandle {
859 self.async_items
860 .push(Some(AsyncItem::PendingKvLookup(pending)))
861 .into()
862 }
863
864 pub fn take_pending_kv_lookup(
869 &mut self,
870 handle: PendingKvLookupHandle,
871 ) -> Result<PendingKvLookupTask, HandleError> {
872 let _ = self.pending_kv_lookup(handle)?;
874
875 self.async_items
876 .get_mut(handle.into())
877 .and_then(Option::take)
878 .and_then(AsyncItem::into_pending_kv_lookup)
879 .ok_or(HandleError::InvalidPendingKvLookupHandle(handle))
880 }
881
882 pub fn pending_kv_lookup(
887 &self,
888 handle: PendingKvLookupHandle,
889 ) -> Result<&PendingKvLookupTask, HandleError> {
890 self.async_items
891 .get(handle.into())
892 .and_then(Option::as_ref)
893 .and_then(AsyncItem::as_pending_kv_lookup)
894 .ok_or(HandleError::InvalidPendingKvLookupHandle(handle))
895 }
896
897 pub fn kv_list(
898 &self,
899 obj_store_key: ObjectStoreKey,
900 cursor: Option<String>,
901 prefix: Option<String>,
902 limit: Option<u32>,
903 ) -> Result<Vec<u8>, KvStoreError> {
904 let limit = limit.unwrap_or(1000);
905
906 self.kv_store().list(obj_store_key, cursor, prefix, limit)
907 }
908
909 pub fn insert_pending_kv_list(&mut self, pending: PendingKvListTask) -> PendingKvListHandle {
914 self.async_items
915 .push(Some(AsyncItem::PendingKvList(pending)))
916 .into()
917 }
918
919 pub fn take_pending_kv_list(
924 &mut self,
925 handle: PendingKvListHandle,
926 ) -> Result<PendingKvListTask, HandleError> {
927 let _ = self.pending_kv_list(handle)?;
929
930 self.async_items
931 .get_mut(handle.into())
932 .and_then(Option::take)
933 .and_then(AsyncItem::into_pending_kv_list)
934 .ok_or(HandleError::InvalidPendingKvListHandle(handle))
935 }
936
937 pub fn pending_kv_list(
942 &self,
943 handle: PendingKvListHandle,
944 ) -> Result<&PendingKvListTask, HandleError> {
945 self.async_items
946 .get(handle.into())
947 .and_then(Option::as_ref)
948 .and_then(AsyncItem::as_pending_kv_list)
949 .ok_or(HandleError::InvalidPendingKvListHandle(handle))
950 }
951
952 pub fn secret_store_handle(&mut self, name: &str) -> Option<SecretStoreHandle> {
955 self.secret_stores().get_store(name)?;
956 Some(self.secret_stores_by_name.push(name.to_string()))
957 }
958
959 pub fn secret_store_name(&self, handle: SecretStoreHandle) -> Option<String> {
960 self.secret_stores_by_name.get(handle).cloned()
961 }
962
963 pub fn secret_handle(&mut self, store_name: &str, secret_name: &str) -> Option<SecretHandle> {
964 self.secret_stores()
965 .get_store(store_name)?
966 .get_secret(secret_name)?;
967 Some(self.secrets_by_name.push(SecretLookup::Standard {
968 store_name: store_name.to_string(),
969 secret_name: secret_name.to_string(),
970 }))
971 }
972
973 pub fn secret_lookup(&self, handle: SecretHandle) -> Option<SecretLookup> {
974 self.secrets_by_name.get(handle).cloned()
975 }
976
977 pub fn add_secret(&mut self, plaintext: Vec<u8>) -> SecretHandle {
978 self.secrets_by_name
979 .push(SecretLookup::Injected { plaintext })
980 }
981
982 pub fn secret_stores(&self) -> &SecretStores {
983 self.ctx.secret_stores()
984 }
985
986 pub fn insert_pending_request(
993 &mut self,
994 pending: PeekableTask<Response<Body>>,
995 ) -> PendingRequestHandle {
996 self.async_items
997 .push(Some(AsyncItem::PendingReq(pending)))
998 .into()
999 }
1000
1001 pub fn pending_request(
1006 &self,
1007 handle: PendingRequestHandle,
1008 ) -> Result<&PeekableTask<Response<Body>>, HandleError> {
1009 self.async_items
1010 .get(handle.into())
1011 .and_then(Option::as_ref)
1012 .and_then(AsyncItem::as_pending_req)
1013 .ok_or(HandleError::InvalidPendingRequestHandle(handle))
1014 }
1015
1016 pub fn pending_request_mut(
1021 &mut self,
1022 handle: PendingRequestHandle,
1023 ) -> Result<&mut PeekableTask<Response<Body>>, HandleError> {
1024 self.async_items
1025 .get_mut(handle.into())
1026 .and_then(Option::as_mut)
1027 .and_then(AsyncItem::as_pending_req_mut)
1028 .ok_or(HandleError::InvalidPendingRequestHandle(handle))
1029 }
1030
1031 pub fn take_pending_request(
1036 &mut self,
1037 handle: PendingRequestHandle,
1038 ) -> Result<PeekableTask<Response<Body>>, HandleError> {
1039 let _ = self.pending_request(handle)?;
1041
1042 self.async_items
1043 .get_mut(handle.into())
1044 .and_then(Option::take)
1045 .and_then(AsyncItem::into_pending_req)
1046 .ok_or(HandleError::InvalidPendingRequestHandle(handle))
1047 }
1048
1049 pub fn reinsert_pending_request(
1050 &mut self,
1051 handle: PendingRequestHandle,
1052 pending_req: PeekableTask<Response<Body>>,
1053 ) -> Result<(), HandleError> {
1054 *self
1055 .async_items
1056 .get_mut(handle.into())
1057 .ok_or(HandleError::InvalidPendingRequestHandle(handle))? =
1058 Some(AsyncItem::PendingReq(pending_req));
1059 Ok(())
1060 }
1061
1062 pub fn insert_cache_op(&mut self, task: PendingCacheTask) -> AsyncItemHandle {
1066 self.async_items.push(Some(AsyncItem::PendingCache(task)))
1067 }
1068
1069 pub(crate) async fn cache_entry_mut(
1072 &mut self,
1073 handle: CacheHandle,
1074 ) -> Result<&mut CacheEntry, HandleError> {
1075 self.async_items
1076 .get_mut(handle.into())
1077 .and_then(Option::as_mut)
1078 .and_then(AsyncItem::as_pending_cache_mut)
1079 .map(PendingCacheTask::as_mut)
1080 .ok_or(HandleError::InvalidCacheHandle(handle))?
1081 .await
1082 .as_mut()
1083 .map_err(|e| {
1084 tracing::error!("in completion of cache lookup: {e}");
1085 HandleError::InvalidCacheHandle(handle)
1086 })
1087 }
1088
1089 pub(crate) async fn cache_entry(
1092 &mut self,
1093 handle: CacheHandle,
1094 ) -> Result<&CacheEntry, HandleError> {
1095 self.async_items
1096 .get_mut(handle.into())
1097 .and_then(Option::as_mut)
1098 .and_then(AsyncItem::as_pending_cache_mut)
1099 .map(PendingCacheTask::as_mut)
1100 .ok_or(HandleError::InvalidCacheHandle(handle))?
1101 .await
1102 .as_ref()
1103 .map_err(|e| {
1104 tracing::error!("in completion of cache lookup: {e}");
1105 HandleError::InvalidCacheHandle(handle)
1106 })
1107 }
1108
1109 pub(crate) fn take_cache_entry(
1113 &mut self,
1114 handle: CacheHandle,
1115 ) -> Result<PendingCacheTask, HandleError> {
1116 self.async_items
1117 .get_mut(handle.into())
1118 .and_then(Option::take)
1119 .and_then(AsyncItem::into_pending_cache)
1120 .ok_or(HandleError::InvalidCacheHandle(handle))
1121 }
1122
1123 pub fn cache(&self) -> &Arc<Cache> {
1125 self.ctx.cache()
1126 }
1127
1128 pub fn prepare_select_targets(
1135 &mut self,
1136 handles: impl IntoIterator<Item = AsyncItemHandle>,
1137 ) -> Result<Vec<SelectTarget>, HandleError> {
1138 let mut targets = vec![];
1141 for handle in handles {
1142 if let Ok(item) = self.take_async_item(handle) {
1143 targets.push(SelectTarget { handle, item });
1144 } else {
1145 self.reinsert_select_targets(targets);
1146 return Err(HandleError::InvalidPendingRequestHandle(handle.into()));
1147 }
1148 }
1149 Ok(targets)
1150 }
1151
1152 pub fn reinsert_select_targets(&mut self, targets: Vec<SelectTarget>) {
1155 for target in targets {
1156 self.reinsert_async_handle(target.handle, target.item);
1157 }
1158 }
1159
1160 pub fn reinsert_async_handle(&mut self, handle: AsyncItemHandle, item: AsyncItem) {
1161 debug_assert!(self.async_items[handle].is_none());
1163 self.async_items[handle] = Some(item);
1164 }
1165
1166 pub fn new_ready(&mut self) -> AsyncItemHandle {
1167 self.async_items.push(Some(AsyncItem::Ready))
1168 }
1169
1170 pub fn session_id(&self) -> u64 {
1177 self.session_id
1178 }
1179
1180 pub fn config_path(&self) -> Option<&Path> {
1182 self.ctx.config_path()
1183 }
1184
1185 pub fn async_item_mut(
1186 &mut self,
1187 handle: AsyncItemHandle,
1188 ) -> Result<&mut AsyncItem, HandleError> {
1189 match self.async_items.get_mut(handle).and_then(|ai| ai.as_mut()) {
1190 Some(item) => Ok(item),
1191 None => Err(HandleError::InvalidAsyncItemHandle(handle.into()))?,
1192 }
1193 }
1194
1195 pub fn take_async_item(&mut self, handle: AsyncItemHandle) -> Result<AsyncItem, HandleError> {
1196 let _ = self.async_item_mut(handle)?;
1198
1199 let item = self
1200 .async_items
1201 .get_mut(handle)
1202 .and_then(|tracked| tracked.take())
1203 .ok_or(HandleError::InvalidAsyncItemHandle(handle.into()))?;
1204
1205 if let AsyncItem::PendingDownstream(_) = item {
1208 self.downstream_pending_handle = None;
1209 }
1210
1211 Ok(item)
1212 }
1213
1214 pub async fn select_impl(
1215 &mut self,
1216 handles: impl IntoIterator<Item = AsyncItemHandle>,
1217 ) -> Result<usize, Error> {
1218 let targets = self.prepare_select_targets(handles)?;
1221 let mut selected = SelectedTargets::new(self, targets);
1222 let done_index = selected.future().await;
1223
1224 Ok(done_index)
1225 }
1226
1227 pub fn shielding_sites(&self) -> &ShieldingSites {
1228 self.ctx.shielding_sites()
1229 }
1230
1231 pub async fn register_pending_downstream_req(
1232 &mut self,
1233 timeout: Option<Duration>,
1234 ) -> Result<AsyncItemHandle, Error> {
1235 if self.downstream_pending_handle.is_some() {
1236 return Err(Error::LimitExceeded {
1237 msg: "Too many pending downstream request handles have been created",
1238 });
1239 }
1240
1241 let rx = if self.next_req_accepted < NEXT_REQ_ACCEPT_MAX {
1242 self.ctx.register_pending_downstream().await
1243 } else {
1244 None
1245 };
1246
1247 if rx.is_none() {
1248 self.next_req_accepted = NEXT_REQ_ACCEPT_MAX;
1249 } else {
1250 self.next_req_accepted += 1;
1251 }
1252
1253 let timeout = timeout.unwrap_or(NEXT_REQ_TIMEOUT).min(NEXT_REQ_TIMEOUT);
1254 let task = PendingDownstreamReqTask::new(rx, timeout);
1255 let handle = self.async_items.push(Some(AsyncItem::from(task)));
1256 self.downstream_pending_handle = Some(handle);
1257
1258 Ok(handle)
1259 }
1260
1261 pub fn take_pending_downstream_req(
1262 &mut self,
1263 handle: AsyncItemHandle,
1264 ) -> Result<PendingDownstreamReqTask, HandleError> {
1265 let task = self
1266 .async_items
1267 .get_mut(handle)
1268 .and_then(|maybe_item| {
1269 if maybe_item
1270 .as_mut()
1271 .and_then(AsyncItem::as_pending_downstream_req_mut)
1272 .is_some()
1273 {
1274 maybe_item
1275 .take()
1276 .and_then(AsyncItem::into_pending_downstream_req)
1277 } else {
1278 None
1279 }
1280 })
1281 .ok_or_else(|| HandleError::InvalidPendingDownstreamHandle(handle.into()))?;
1282
1283 self.downstream_pending_handle = None;
1284
1285 Ok(task)
1286 }
1287
1288 pub async fn await_downstream_req(
1290 &mut self,
1291 handle: AsyncItemHandle,
1292 ) -> Result<Option<(RequestHandle, BodyHandle)>, Error> {
1293 if self.downstream_resp.is_unsent() {
1294 return Err(Error::Unsupported {
1295 msg: "cannot accept requests w/o handling the outstanding one",
1296 });
1297 }
1298
1299 let item = self.take_pending_downstream_req(handle)?;
1300 let Some(downstream) = item.recv().await? else {
1301 return Ok(None);
1302 };
1303
1304 let (parts, body) = downstream.req.into_parts();
1305 let body_handle = self.async_items.push(Some(AsyncItem::Body(body)));
1306 let req_handle = self.req_parts.push(RequestParts {
1307 parts: Some(parts),
1308 metadata: Some(downstream.metadata),
1309 });
1310
1311 self.downstream_resp = DownstreamResponseState::new(downstream.sender);
1312 self.downstream_req_handle = req_handle;
1313 self.downstream_req_body_handle = body_handle.into();
1314
1315 Ok(Some((req_handle, body_handle.into())))
1316 }
1317
1318 pub fn abandon_pending_downstream_req(
1319 &mut self,
1320 handle: AsyncItemHandle,
1321 ) -> Result<(), HandleError> {
1322 self.take_pending_downstream_req(handle).map(|_| ())
1323 }
1324
1325 pub fn ctx(&self) -> &Arc<ExecuteCtx> {
1326 &self.ctx
1327 }
1328}
1329
1330pub struct SelectedTargets<'session> {
1331 session: &'session mut Session,
1332 targets: Vec<SelectTarget>,
1333}
1334
1335impl<'session> SelectedTargets<'session> {
1336 fn new(session: &'session mut Session, targets: Vec<SelectTarget>) -> Self {
1337 Self { session, targets }
1338 }
1339
1340 fn future(&mut self) -> Box<dyn Future<Output = usize> + Unpin + Send + Sync + '_> {
1341 let mut futures = Vec::new();
1344 for target in &mut *self.targets {
1345 futures.push(Box::pin(target.item.await_ready()))
1346 }
1347 if futures.is_empty() {
1348 Box::new(future::pending())
1351 } else {
1352 Box::new(future::select_all(futures).map(|f| f.1))
1353 }
1354 }
1355}
1356
1357impl<'session> Drop for SelectedTargets<'session> {
1358 fn drop(&mut self) {
1359 let targets = std::mem::take(&mut self.targets);
1360 self.session.reinsert_select_targets(targets);
1361 }
1362}
1363
1364#[derive(Clone, Debug)]
1366pub struct ViceroyRequestMetadata {
1367 pub auto_decompress_encodings: ContentEncodings,
1368 pub framing_headers_mode: FramingHeadersMode,
1369}
1370
1371impl Default for ViceroyRequestMetadata {
1372 fn default() -> Self {
1373 ViceroyRequestMetadata {
1374 auto_decompress_encodings: ContentEncodings::empty(),
1375 framing_headers_mode: FramingHeadersMode::Automatic,
1376 }
1377 }
1378}
1379
1380#[derive(Clone, Debug)]
1382pub struct ViceroyResponseMetadata {
1383 pub framing_headers_mode: FramingHeadersMode,
1384}
1385
1386impl Default for ViceroyResponseMetadata {
1387 fn default() -> Self {
1388 ViceroyResponseMetadata {
1389 framing_headers_mode: FramingHeadersMode::Automatic,
1390 }
1391 }
1392}
1393
1394#[derive(Clone, Copy, Eq, Hash, PartialEq)]
1395#[repr(transparent)]
1396pub struct AsyncItemHandle(u32);
1397
1398entity_impl!(AsyncItemHandle, "async_item");
1399
1400impl From<BodyHandle> for AsyncItemHandle {
1406 fn from(h: BodyHandle) -> AsyncItemHandle {
1407 AsyncItemHandle::from_u32(h.into())
1408 }
1409}
1410
1411impl From<AsyncItemHandle> for BodyHandle {
1412 fn from(h: AsyncItemHandle) -> BodyHandle {
1413 BodyHandle::from(h.as_u32())
1414 }
1415}
1416
1417impl From<PendingRequestHandle> for AsyncItemHandle {
1418 fn from(h: PendingRequestHandle) -> AsyncItemHandle {
1419 AsyncItemHandle::from_u32(h.into())
1420 }
1421}
1422
1423impl From<AsyncItemHandle> for PendingRequestHandle {
1424 fn from(h: AsyncItemHandle) -> PendingRequestHandle {
1425 PendingRequestHandle::from(h.as_u32())
1426 }
1427}
1428
1429impl From<types::AsyncItemHandle> for AsyncItemHandle {
1430 fn from(h: types::AsyncItemHandle) -> AsyncItemHandle {
1431 AsyncItemHandle::from_u32(h.into())
1432 }
1433}
1434
1435impl From<AsyncItemHandle> for types::AsyncItemHandle {
1436 fn from(h: AsyncItemHandle) -> types::AsyncItemHandle {
1437 types::AsyncItemHandle::from(h.as_u32())
1438 }
1439}
1440
1441impl From<PendingKvLookupHandle> for AsyncItemHandle {
1442 fn from(h: PendingKvLookupHandle) -> AsyncItemHandle {
1443 AsyncItemHandle::from_u32(h.into())
1444 }
1445}
1446
1447impl From<AsyncItemHandle> for PendingKvLookupHandle {
1448 fn from(h: AsyncItemHandle) -> PendingKvLookupHandle {
1449 PendingKvLookupHandle::from(h.as_u32())
1450 }
1451}
1452
1453impl From<PendingKvInsertHandle> for AsyncItemHandle {
1454 fn from(h: PendingKvInsertHandle) -> AsyncItemHandle {
1455 AsyncItemHandle::from_u32(h.into())
1456 }
1457}
1458
1459impl From<AsyncItemHandle> for PendingKvInsertHandle {
1460 fn from(h: AsyncItemHandle) -> PendingKvInsertHandle {
1461 PendingKvInsertHandle::from(h.as_u32())
1462 }
1463}
1464
1465impl From<PendingKvDeleteHandle> for AsyncItemHandle {
1466 fn from(h: PendingKvDeleteHandle) -> AsyncItemHandle {
1467 AsyncItemHandle::from_u32(h.into())
1468 }
1469}
1470
1471impl From<AsyncItemHandle> for PendingKvDeleteHandle {
1472 fn from(h: AsyncItemHandle) -> PendingKvDeleteHandle {
1473 PendingKvDeleteHandle::from(h.as_u32())
1474 }
1475}
1476
1477impl From<PendingKvListHandle> for AsyncItemHandle {
1478 fn from(h: PendingKvListHandle) -> AsyncItemHandle {
1479 AsyncItemHandle::from_u32(h.into())
1480 }
1481}
1482
1483impl From<AsyncItemHandle> for PendingKvListHandle {
1484 fn from(h: AsyncItemHandle) -> PendingKvListHandle {
1485 PendingKvListHandle::from(h.as_u32())
1486 }
1487}
1488
1489impl From<KvStoreLookupHandle> for AsyncItemHandle {
1490 fn from(h: KvStoreLookupHandle) -> AsyncItemHandle {
1491 AsyncItemHandle::from_u32(h.into())
1492 }
1493}
1494
1495impl From<AsyncItemHandle> for KvStoreLookupHandle {
1496 fn from(h: AsyncItemHandle) -> KvStoreLookupHandle {
1497 KvStoreLookupHandle::from(h.as_u32())
1498 }
1499}
1500
1501impl From<KvStoreInsertHandle> for AsyncItemHandle {
1502 fn from(h: KvStoreInsertHandle) -> AsyncItemHandle {
1503 AsyncItemHandle::from_u32(h.into())
1504 }
1505}
1506
1507impl From<AsyncItemHandle> for KvStoreInsertHandle {
1508 fn from(h: AsyncItemHandle) -> KvStoreInsertHandle {
1509 KvStoreInsertHandle::from(h.as_u32())
1510 }
1511}
1512
1513impl From<KvStoreDeleteHandle> for AsyncItemHandle {
1514 fn from(h: KvStoreDeleteHandle) -> AsyncItemHandle {
1515 AsyncItemHandle::from_u32(h.into())
1516 }
1517}
1518
1519impl From<AsyncItemHandle> for KvStoreDeleteHandle {
1520 fn from(h: AsyncItemHandle) -> KvStoreDeleteHandle {
1521 KvStoreDeleteHandle::from(h.as_u32())
1522 }
1523}
1524
1525impl From<KvStoreListHandle> for AsyncItemHandle {
1526 fn from(h: KvStoreListHandle) -> AsyncItemHandle {
1527 AsyncItemHandle::from_u32(h.into())
1528 }
1529}
1530
1531impl From<AsyncItemHandle> for KvStoreListHandle {
1532 fn from(h: AsyncItemHandle) -> KvStoreListHandle {
1533 KvStoreListHandle::from(h.as_u32())
1534 }
1535}
1536
1537impl From<AsyncItemHandle> for CacheHandle {
1538 fn from(h: AsyncItemHandle) -> CacheHandle {
1539 CacheHandle::from(h.as_u32())
1540 }
1541}
1542
1543impl From<CacheHandle> for AsyncItemHandle {
1544 fn from(h: CacheHandle) -> AsyncItemHandle {
1545 AsyncItemHandle::from_u32(h.into())
1546 }
1547}
1548
1549impl From<AsyncItemHandle> for CacheBusyHandle {
1550 fn from(h: AsyncItemHandle) -> CacheBusyHandle {
1551 CacheBusyHandle::from(h.as_u32())
1552 }
1553}
1554
1555impl From<CacheBusyHandle> for AsyncItemHandle {
1556 fn from(h: CacheBusyHandle) -> AsyncItemHandle {
1557 AsyncItemHandle::from_u32(h.into())
1558 }
1559}
1560
1561impl From<AsyncItemHandle> for RequestPromiseHandle {
1562 fn from(h: AsyncItemHandle) -> RequestPromiseHandle {
1563 RequestPromiseHandle::from(h.as_u32())
1564 }
1565}
1566
1567impl From<RequestPromiseHandle> for AsyncItemHandle {
1568 fn from(h: RequestPromiseHandle) -> AsyncItemHandle {
1569 AsyncItemHandle::from_u32(h.into())
1570 }
1571}
1572
1573impl From<CacheBusyHandle> for CacheHandle {
1575 fn from(h: CacheBusyHandle) -> CacheHandle {
1576 let raw: u32 = h.into();
1577 CacheHandle::from(raw)
1578 }
1579}