1use core::str;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use http::HeaderMap;
7
8use crate::body::Body;
9use crate::cache::{CacheKey, SurrogateKeySet, VaryRule, WriteOptions};
10use crate::session::{PeekableTask, PendingCacheTask, Session};
11use crate::wiggle_abi::types::CacheWriteOptionsMask;
12
13use super::fastly_cache::FastlyCache;
14use super::{types, Error};
15
16fn load_cache_key(
17 memory: &wiggle::GuestMemory<'_>,
18 cache_key: wiggle::GuestPtr<[u8]>,
19) -> Result<CacheKey, Error> {
20 let bytes = memory.as_slice(cache_key)?.ok_or(Error::SharedMemory)?;
21 let key: CacheKey = bytes.try_into().map_err(|_| Error::InvalidArgument)?;
22 Ok(key)
23}
24
25fn load_write_options(
26 memory: &wiggle::GuestMemory<'_>,
27 mut options_mask: types::CacheWriteOptionsMask,
28 options: &types::CacheWriteOptions,
29) -> Result<WriteOptions, Error> {
30 assert!(
32 !options_mask.contains(CacheWriteOptionsMask::REQUEST_HEADERS),
33 "Viceroy bug! headers must be handled before load_write_options"
34 );
35
36 let max_age = Duration::from_nanos(options.max_age_ns);
38
39 let initial_age = if options_mask.contains(CacheWriteOptionsMask::INITIAL_AGE_NS) {
40 Duration::from_nanos(options.initial_age_ns)
41 } else {
42 Duration::ZERO
43 };
44
45 options_mask &= !CacheWriteOptionsMask::INITIAL_AGE_NS;
46
47 let stale_while_revalidate =
48 if options_mask.contains(CacheWriteOptionsMask::STALE_WHILE_REVALIDATE_NS) {
49 Duration::from_nanos(options.stale_while_revalidate_ns)
50 } else {
51 Duration::ZERO
52 };
53 options_mask &= !CacheWriteOptionsMask::STALE_WHILE_REVALIDATE_NS;
54
55 let vary_rule = if options_mask.contains(CacheWriteOptionsMask::VARY_RULE) {
56 let slice = options.vary_rule_ptr.as_array(options.vary_rule_len);
57 let vary_rule_bytes = memory.as_slice(slice)?.ok_or(Error::SharedMemory)?;
58 let vary_rule_str = str::from_utf8(vary_rule_bytes).map_err(|e| Error::Utf8Expected(e))?;
59 vary_rule_str.parse()?
60 } else {
61 VaryRule::default()
62 };
63 options_mask &= !CacheWriteOptionsMask::VARY_RULE;
64
65 let user_metadata = if options_mask.contains(CacheWriteOptionsMask::USER_METADATA) {
66 let slice = options
67 .user_metadata_ptr
68 .as_array(options.user_metadata_len);
69 let user_metadata_bytes = memory.as_slice(slice)?.ok_or(Error::SharedMemory)?;
70 Bytes::copy_from_slice(user_metadata_bytes)
71 } else {
72 Bytes::new()
73 };
74 options_mask &= !CacheWriteOptionsMask::USER_METADATA;
75
76 let length = if options_mask.contains(CacheWriteOptionsMask::LENGTH) {
77 Some(options.length)
78 } else {
79 None
80 };
81 options_mask &= !CacheWriteOptionsMask::LENGTH;
82
83 let sensitive_data = options_mask.contains(CacheWriteOptionsMask::SENSITIVE_DATA);
84 options_mask &= !CacheWriteOptionsMask::SENSITIVE_DATA;
85
86 if options_mask.contains(CacheWriteOptionsMask::SERVICE_ID) {
88 return Err(Error::Unsupported {
89 msg: "cache on_behalf_of is not supported in Viceroy",
90 });
91 }
92 options_mask &= !CacheWriteOptionsMask::SERVICE_ID;
93
94 let edge_max_age = if options_mask.contains(CacheWriteOptionsMask::EDGE_MAX_AGE_NS) {
95 Duration::from_nanos(options.edge_max_age_ns)
96 } else {
97 max_age
98 };
99 if edge_max_age > max_age {
100 tracing::error!(
101 "deliver node max age {} must be less than TTL {}",
102 edge_max_age.as_secs(),
103 max_age.as_secs()
104 );
105 return Err(Error::InvalidArgument);
106 }
107 options_mask &= !CacheWriteOptionsMask::EDGE_MAX_AGE_NS;
108
109 let surrogate_keys = if options_mask.contains(CacheWriteOptionsMask::SURROGATE_KEYS) {
110 let slice = options
111 .surrogate_keys_ptr
112 .as_array(options.surrogate_keys_len);
113 let surrogate_keys_bytes = memory.as_slice(slice)?.ok_or(Error::SharedMemory)?;
114 surrogate_keys_bytes.try_into()?
115 } else {
116 SurrogateKeySet::default()
117 };
118 options_mask &= !CacheWriteOptionsMask::SURROGATE_KEYS;
119
120 if !options_mask.is_empty() {
121 return Err(Error::NotAvailable("unknown cache write option"));
122 }
123
124 Ok(WriteOptions {
125 max_age,
126 initial_age,
127 stale_while_revalidate,
128 vary_rule,
129 user_metadata,
130 length,
131 sensitive_data,
132 edge_max_age,
133 surrogate_keys,
134 })
135}
136
137struct LookupOptions {
138 headers: HeaderMap,
139 always_use_requested_range: bool,
140}
141
142fn load_lookup_options(
143 session: &Session,
144 memory: &wiggle::GuestMemory<'_>,
145 mut options_mask: types::CacheLookupOptionsMask,
146 options: wiggle::GuestPtr<types::CacheLookupOptions>,
147) -> Result<LookupOptions, Error> {
148 let options = memory.read(options)?;
149 let headers = if options_mask.contains(types::CacheLookupOptionsMask::REQUEST_HEADERS) {
150 let handle = options.request_headers;
151 let parts = session.request_parts(handle)?;
152 parts.headers.clone()
153 } else {
154 HeaderMap::default()
155 };
156
157 options_mask &= !types::CacheLookupOptionsMask::REQUEST_HEADERS;
158
159 if options_mask.contains(types::CacheLookupOptionsMask::SERVICE_ID) {
160 return Err(Error::Unsupported {
162 msg: "service ID in cache lookup is not supported in Viceroy",
163 });
164 }
165
166 let always_use_requested_range =
167 options_mask.contains(types::CacheLookupOptionsMask::ALWAYS_USE_REQUESTED_RANGE);
168 options_mask &= !types::CacheLookupOptionsMask::ALWAYS_USE_REQUESTED_RANGE;
169
170 if !options_mask.is_empty() {
171 return Err(Error::NotAvailable("unknown cache lookup option"));
172 }
173
174 Ok(LookupOptions {
175 headers,
176 always_use_requested_range,
177 })
178}
179
180#[allow(unused_variables)]
181#[wiggle::async_trait]
182impl FastlyCache for Session {
183 async fn lookup(
184 &mut self,
185 memory: &mut wiggle::GuestMemory<'_>,
186 cache_key: wiggle::GuestPtr<[u8]>,
187 options_mask: types::CacheLookupOptionsMask,
188 options: wiggle::GuestPtr<types::CacheLookupOptions>,
189 ) -> Result<types::CacheHandle, Error> {
190 let LookupOptions {
191 headers,
192 always_use_requested_range,
193 } = load_lookup_options(&self, memory, options_mask, options)?;
194 let key = load_cache_key(memory, cache_key)?;
195 let cache = Arc::clone(self.cache());
196
197 let task = PeekableTask::spawn(Box::pin(async move {
198 Ok(cache
199 .lookup(&key, &headers)
200 .await
201 .with_always_use_requested_range(always_use_requested_range))
202 }))
203 .await;
204 let task = PendingCacheTask::new(task);
205 let handle = self.insert_cache_op(task);
206 Ok(handle.into())
207 }
208
209 async fn insert(
210 &mut self,
211 memory: &mut wiggle::GuestMemory<'_>,
212 cache_key: wiggle::GuestPtr<[u8]>,
213 options_mask: types::CacheWriteOptionsMask,
214 options: wiggle::GuestPtr<types::CacheWriteOptions>,
215 ) -> Result<types::BodyHandle, Error> {
216 let key = load_cache_key(memory, cache_key)?;
217 let guest_options = memory.read(options)?;
218
219 let request_headers = if options_mask.contains(CacheWriteOptionsMask::REQUEST_HEADERS) {
221 let handle = guest_options.request_headers;
222 let parts = self.request_parts(handle)?;
223 parts.headers.clone()
224 } else {
225 HeaderMap::default()
226 };
227 let options = load_write_options(
228 memory,
229 options_mask & !CacheWriteOptionsMask::REQUEST_HEADERS,
230 &guest_options,
231 )?;
232 let cache = Arc::clone(self.cache());
233
234 let handle = self.insert_body(Body::empty());
235 let read_body = self.begin_streaming(handle)?;
236 cache
237 .insert(&key, request_headers, options, read_body)
238 .await;
239 Ok(handle)
240 }
241
242 async fn replace(
243 &mut self,
244 memory: &mut wiggle::GuestMemory<'_>,
245 cache_key: wiggle::GuestPtr<[u8]>,
246 options_mask: types::CacheReplaceOptionsMask,
247 abi_options: wiggle::GuestPtr<types::CacheReplaceOptions>,
248 ) -> Result<types::CacheReplaceHandle, Error> {
249 Err(Error::NotAvailable("Cache API primitives"))
250 }
251
252 async fn replace_get_age_ns(
253 &mut self,
254 memory: &mut wiggle::GuestMemory<'_>,
255 cache_handle: types::CacheReplaceHandle,
256 ) -> Result<types::CacheDurationNs, Error> {
257 Err(Error::NotAvailable("Cache API primitives"))
258 }
259
260 async fn replace_get_body(
261 &mut self,
262 memory: &mut wiggle::GuestMemory<'_>,
263 cache_handle: types::CacheReplaceHandle,
264 options_mask: types::CacheGetBodyOptionsMask,
265 options: &types::CacheGetBodyOptions,
266 ) -> Result<types::BodyHandle, Error> {
267 Err(Error::NotAvailable("Cache API primitives"))
268 }
269
270 async fn replace_get_hits(
271 &mut self,
272 memory: &mut wiggle::GuestMemory<'_>,
273 cache_handle: types::CacheReplaceHandle,
274 ) -> Result<u64, Error> {
275 Err(Error::NotAvailable("Cache API primitives"))
276 }
277
278 async fn replace_get_length(
279 &mut self,
280 memory: &mut wiggle::GuestMemory<'_>,
281 cache_handle: types::CacheReplaceHandle,
282 ) -> Result<u64, Error> {
283 Err(Error::NotAvailable("Cache API primitives"))
284 }
285
286 async fn replace_get_max_age_ns(
287 &mut self,
288 memory: &mut wiggle::GuestMemory<'_>,
289 cache_handle: types::CacheReplaceHandle,
290 ) -> Result<types::CacheDurationNs, Error> {
291 Err(Error::NotAvailable("Cache API primitives"))
292 }
293
294 async fn replace_get_stale_while_revalidate_ns(
295 &mut self,
296 memory: &mut wiggle::GuestMemory<'_>,
297 cache_handle: types::CacheReplaceHandle,
298 ) -> Result<types::CacheDurationNs, Error> {
299 Err(Error::NotAvailable("Cache API primitives"))
300 }
301
302 async fn replace_get_state(
303 &mut self,
304 memory: &mut wiggle::GuestMemory<'_>,
305 cache_handle: types::CacheReplaceHandle,
306 ) -> Result<types::CacheLookupState, Error> {
307 Err(Error::NotAvailable("Cache API primitives"))
308 }
309
310 async fn replace_get_user_metadata(
311 &mut self,
312 memory: &mut wiggle::GuestMemory<'_>,
313 cache_handle: types::CacheReplaceHandle,
314 out_ptr: wiggle::GuestPtr<u8>,
315 out_len: u32,
316 nwritten_out: wiggle::GuestPtr<u32>,
317 ) -> Result<(), Error> {
318 Err(Error::NotAvailable("Cache API primitives"))
319 }
320
321 async fn replace_insert(
322 &mut self,
323 memory: &mut wiggle::GuestMemory<'_>,
324 cache_handle: types::CacheReplaceHandle,
325 options_mask: types::CacheWriteOptionsMask,
326 abi_options: wiggle::GuestPtr<types::CacheWriteOptions>,
327 ) -> Result<types::BodyHandle, Error> {
328 Err(Error::NotAvailable("Cache API primitives"))
329 }
330
331 async fn transaction_lookup(
332 &mut self,
333 memory: &mut wiggle::GuestMemory<'_>,
334 cache_key: wiggle::GuestPtr<[u8]>,
335 options_mask: types::CacheLookupOptionsMask,
336 options: wiggle::GuestPtr<types::CacheLookupOptions>,
337 ) -> Result<types::CacheHandle, Error> {
338 let h = self
339 .transaction_lookup_async(memory, cache_key, options_mask, options)
340 .await?;
341 self.cache_busy_handle_wait(memory, h).await
342 }
343
344 async fn transaction_lookup_async(
345 &mut self,
346 memory: &mut wiggle::GuestMemory<'_>,
347 cache_key: wiggle::GuestPtr<[u8]>,
348 options_mask: types::CacheLookupOptionsMask,
349 options: wiggle::GuestPtr<types::CacheLookupOptions>,
350 ) -> Result<types::CacheBusyHandle, Error> {
351 let LookupOptions {
352 headers,
353 always_use_requested_range,
354 } = load_lookup_options(&self, memory, options_mask, options)?;
355 let key = load_cache_key(memory, cache_key)?;
356 let cache = Arc::clone(self.cache());
357
358 let e = cache
360 .transaction_lookup(&key, &headers, false)
361 .await
362 .with_always_use_requested_range(always_use_requested_range);
363 let ready = e.found().is_some() || e.go_get().is_some();
364 let task = if ready {
367 PeekableTask::complete(e)
368 } else {
369 PeekableTask::spawn(Box::pin(async move {
370 Ok(cache
371 .transaction_lookup(&key, &headers, true)
372 .await
373 .with_always_use_requested_range(always_use_requested_range))
374 }))
375 .await
376 };
377
378 let task = PendingCacheTask::new(task);
379 let handle = self.insert_cache_op(task);
380 Ok(handle.into())
381 }
382
383 async fn cache_busy_handle_wait(
384 &mut self,
385 memory: &mut wiggle::GuestMemory<'_>,
386 handle: types::CacheBusyHandle,
387 ) -> Result<types::CacheHandle, Error> {
388 let handle = handle.into();
389 let entry = self.cache_entry_mut(handle).await?;
391 let mut other_entry = entry.stub();
392 std::mem::swap(entry, &mut other_entry);
393 let task = PeekableTask::spawn(Box::pin(async move { Ok(other_entry) })).await;
394 Ok(self.insert_cache_op(PendingCacheTask::new(task)).into())
395 }
396
397 async fn transaction_insert(
398 &mut self,
399 memory: &mut wiggle::GuestMemory<'_>,
400 handle: types::CacheHandle,
401 options_mask: types::CacheWriteOptionsMask,
402 options: wiggle::GuestPtr<types::CacheWriteOptions>,
403 ) -> Result<types::BodyHandle, Error> {
404 let (body, cache_handle) = self
405 .transaction_insert_and_stream_back(memory, handle, options_mask, options)
406 .await?;
407 let _ = self.take_cache_entry(cache_handle)?;
409 Ok(body)
410 }
411
412 async fn transaction_insert_and_stream_back(
413 &mut self,
414 memory: &mut wiggle::GuestMemory<'_>,
415 handle: types::CacheHandle,
416 options_mask: types::CacheWriteOptionsMask,
417 options: wiggle::GuestPtr<types::CacheWriteOptions>,
418 ) -> Result<(types::BodyHandle, types::CacheHandle), Error> {
419 let guest_options = memory.read(options)?;
420 if options_mask.contains(CacheWriteOptionsMask::REQUEST_HEADERS) {
422 return Err(Error::InvalidArgument);
423 }
424 let options = load_write_options(memory, options_mask, &guest_options)?;
425
426 let body_handle = self.insert_body(Body::empty());
428 let read_body = self.begin_streaming(body_handle)?;
429
430 let e = self
431 .cache_entry_mut(handle)
432 .await?
433 .insert(options, read_body)?;
434
435 let handle = self.insert_cache_op(PendingCacheTask::new(PeekableTask::complete(e)));
437
438 Ok((body_handle, handle.into()))
439 }
440
441 async fn transaction_update(
442 &mut self,
443 memory: &mut wiggle::GuestMemory<'_>,
444 handle: types::CacheHandle,
445 options_mask: types::CacheWriteOptionsMask,
446 options: wiggle::GuestPtr<types::CacheWriteOptions>,
447 ) -> Result<(), Error> {
448 let guest_options = memory.read(options)?;
449 if options_mask.contains(CacheWriteOptionsMask::REQUEST_HEADERS) {
451 return Err(Error::InvalidArgument);
452 }
453 let options = load_write_options(memory, options_mask, &guest_options)?;
454
455 let entry = self.cache_entry_mut(handle).await?;
456 entry.update(options).await?;
460
461 Ok(())
462 }
463
464 async fn transaction_cancel(
465 &mut self,
466 memory: &mut wiggle::GuestMemory<'_>,
467 handle: types::CacheHandle,
468 ) -> Result<(), Error> {
469 let entry = self.cache_entry_mut(handle.into()).await?;
470 if entry.cancel() {
471 Ok(())
472 } else {
473 Err(Error::CacheError(crate::cache::Error::CannotWrite).into())
474 }
475 }
476
477 async fn close_busy(
478 &mut self,
479 memory: &mut wiggle::GuestMemory<'_>,
480 handle: types::CacheBusyHandle,
481 ) -> Result<(), Error> {
482 let _ = self.take_cache_entry(handle.into())?;
484 Ok(())
485 }
486
487 async fn close(
488 &mut self,
489 memory: &mut wiggle::GuestMemory<'_>,
490 handle: types::CacheHandle,
491 ) -> Result<(), Error> {
492 let _ = self.take_cache_entry(handle)?.task().recv().await?;
493 Ok(())
494 }
495
496 async fn get_state(
497 &mut self,
498 memory: &mut wiggle::GuestMemory<'_>,
499 handle: types::CacheHandle,
500 ) -> Result<types::CacheLookupState, Error> {
501 let entry = self.cache_entry_mut(handle).await?;
502
503 let mut state = types::CacheLookupState::empty();
504 if let Some(found) = entry.found() {
505 if found.meta().is_usable() {
509 state |= types::CacheLookupState::USABLE;
510 state |= types::CacheLookupState::FOUND;
511
512 if !found.meta().is_fresh() {
513 state |= types::CacheLookupState::STALE;
514 }
515 }
516 }
517 if entry.go_get().is_some() {
518 state |= types::CacheLookupState::MUST_INSERT_OR_UPDATE;
519 }
520
521 Ok(state)
522 }
523
524 async fn get_user_metadata(
525 &mut self,
526 memory: &mut wiggle::GuestMemory<'_>,
527 handle: types::CacheHandle,
528 user_metadata_out_ptr: wiggle::GuestPtr<u8>,
529 user_metadata_out_len: u32,
530 nwritten_out: wiggle::GuestPtr<u32>,
531 ) -> Result<(), Error> {
532 let entry = self.cache_entry(handle.into()).await?;
533
534 let md_bytes = entry
535 .found()
536 .map(|found| found.meta().user_metadata())
537 .ok_or(crate::Error::CacheError(crate::cache::Error::Missing))?;
538 let len: u32 = md_bytes
539 .len()
540 .try_into()
541 .expect("user metadata must be shorter than u32 can indicate");
542 if len > user_metadata_out_len {
543 memory.write(nwritten_out, len)?;
544 return Err(Error::BufferLengthError {
545 buf: "user_metadata_out_ptr",
546 len: "user_metadata_out_len",
547 });
548 }
549 let user_metadata = memory
550 .as_slice_mut(user_metadata_out_ptr.as_array(user_metadata_out_len))?
551 .ok_or(Error::SharedMemory)?;
552 user_metadata[..(len as usize)].copy_from_slice(&md_bytes);
553 memory.write(nwritten_out, len)?;
554
555 Ok(())
556 }
557
558 async fn get_body(
559 &mut self,
560 memory: &mut wiggle::GuestMemory<'_>,
561 handle: types::CacheHandle,
562 mut options_mask: types::CacheGetBodyOptionsMask,
563 options: &types::CacheGetBodyOptions,
564 ) -> Result<types::BodyHandle, Error> {
565 let from = if options_mask.contains(types::CacheGetBodyOptionsMask::FROM) {
566 Some(options.from)
567 } else {
568 None
569 };
570 options_mask &= !types::CacheGetBodyOptionsMask::FROM;
571 let to = if options_mask.contains(types::CacheGetBodyOptionsMask::TO) {
572 Some(options.to)
573 } else {
574 None
575 };
576 options_mask &= !types::CacheGetBodyOptionsMask::TO;
577
578 if !options_mask.is_empty() {
579 return Err(Error::NotAvailable("unknown cache get_body option").into());
580 }
581
582 let entry = self.cache_entry(handle.into()).await?;
591
592 let body = entry.body(from, to).await?;
597
598 let found = entry
599 .found()
600 .ok_or(Error::CacheError(crate::cache::Error::Missing))?;
601
602 if let Some(prev_handle) = found.last_body_handle {
603 if self.body(prev_handle).is_ok() {
605 return Err(Error::CacheError(crate::cache::Error::HandleBodyUsed));
606 }
607 };
608
609 let body_handle = self.insert_body(body);
610 self.cache_entry_mut(handle.into())
613 .await?
614 .found_mut()
615 .unwrap()
616 .last_body_handle = Some(body_handle.into());
617
618 Ok(body_handle.into())
619 }
620
621 async fn get_length(
622 &mut self,
623 memory: &mut wiggle::GuestMemory<'_>,
624 handle: types::CacheHandle,
625 ) -> Result<types::CacheObjectLength, Error> {
626 let found = self
627 .cache_entry(handle.into())
628 .await?
629 .found()
630 .ok_or(Error::CacheError(crate::cache::Error::Missing))?;
631 found
632 .length()
633 .ok_or(Error::CacheError(crate::cache::Error::Missing))
634 }
635
636 async fn get_max_age_ns(
637 &mut self,
638 memory: &mut wiggle::GuestMemory<'_>,
639 handle: types::CacheHandle,
640 ) -> Result<types::CacheDurationNs, Error> {
641 let entry = self.cache_entry_mut(handle).await?;
642 if let Some(found) = entry.found() {
643 Ok(found.meta().max_age().as_nanos().try_into().unwrap())
644 } else {
645 Err(Error::CacheError(crate::cache::Error::Missing))
646 }
647 }
648
649 async fn get_stale_while_revalidate_ns(
650 &mut self,
651 memory: &mut wiggle::GuestMemory<'_>,
652 handle: types::CacheHandle,
653 ) -> Result<types::CacheDurationNs, Error> {
654 Err(Error::NotAvailable("Cache API primitives"))
655 }
656
657 async fn get_age_ns(
658 &mut self,
659 memory: &mut wiggle::GuestMemory<'_>,
660 handle: types::CacheHandle,
661 ) -> Result<types::CacheDurationNs, Error> {
662 let entry = self.cache_entry_mut(handle).await?;
663 if let Some(found) = entry.found() {
664 Ok(found.meta().age().as_nanos().try_into().unwrap())
665 } else {
666 Err(Error::CacheError(crate::cache::Error::Missing))
667 }
668 }
669
670 async fn get_hits(
671 &mut self,
672 memory: &mut wiggle::GuestMemory<'_>,
673 handle: types::CacheHandle,
674 ) -> Result<types::CacheHitCount, Error> {
675 Err(Error::NotAvailable("Cache API primitives"))
676 }
677}