Skip to main content

varnish_sys/vcl/
processor.rs

1//! Varnish has the ability to modify the body of object leaving its cache using delivery
2//! processors, named `VDP` in the C API, and implemented here using the [`DeliveryProcessor`] trait.
3//! Processors are linked together and will read, modify and push data down the delivery pipeline.
4//!
5//! *Note:* The rust wrapper here is pretty thin and the vmod writer will most probably need to have to
6//! deal with the raw Varnish internals.
7
8use std::ffi::{c_int, c_void, CStr};
9use std::ptr;
10
11use crate::ffi::{vdp_ctx, vfp_ctx, vfp_entry, vrt_ctx, VdpAction, VfpStatus};
12use crate::vcl::{Ctx, VclError};
13use crate::{ffi, validate_vfp_ctx, validate_vfp_entry};
14
15/// The return type for [`DeliveryProcessor::push`]
16#[derive(Debug, Copy, Clone)]
17pub enum PushResult {
18    /// Indicates a failure, the pipeline will be stopped with an error
19    Err,
20    /// Nothing special, processing should continue
21    Ok,
22    /// Stop early, without error
23    End,
24}
25
26/// The return type for [`FetchProcessor::pull`]
27#[derive(Debug, Copy, Clone)]
28pub enum PullResult {
29    /// Indicates a failure, the pipeline will be stopped with an error
30    Err,
31    /// Specify how many bytes were written to the buffer, and that the processor is ready for the
32    /// next call
33    Ok(usize),
34    /// The processor is done, and returns how many bytes were treated
35    End(usize),
36}
37
38/// The return type for [`DeliveryProcessor::new`] and [`FetchProcessor::new`]
39#[derive(Debug)]
40pub enum InitResult<T> {
41    Err(VclError),
42    Ok(T),
43    Pass,
44}
45
46/// Describes a Varnish Delivery Processor (VDP)
47pub trait DeliveryProcessor: Sized {
48    /// The name of the processor.
49    fn name() -> &'static CStr;
50    /// Create a new processor, possibly using knowledge from the pipeline, or from the current
51    /// request.
52    fn new(vrt_ctx: &mut Ctx, vdp_ctx: &mut DeliveryProcCtx) -> InitResult<Self>;
53    /// Handle the data buffer from the previous processor. This function generally uses
54    /// [`DeliveryProcCtx::push`] to push data to the next processor.
55    fn push(&mut self, ctx: &mut DeliveryProcCtx, act: VdpAction, buf: &[u8]) -> PushResult;
56}
57
58pub unsafe extern "C" fn gen_vdp_init<T: DeliveryProcessor>(
59    vrt_ctx: *const vrt_ctx,
60    ctx_raw: *mut vdp_ctx,
61    priv_: *mut *mut c_void,
62) -> c_int {
63    assert_ne!(priv_, ptr::null_mut());
64    assert_eq!(*priv_, ptr::null_mut());
65    match T::new(
66        &mut Ctx::from_ptr(vrt_ctx),
67        &mut DeliveryProcCtx::from_ptr(ctx_raw),
68    ) {
69        InitResult::Ok(proc) => {
70            *priv_ = Box::into_raw(Box::new(proc)).cast::<c_void>();
71            0
72        }
73        InitResult::Err(_) => -1, // TODO: log error
74        InitResult::Pass => 1,
75    }
76}
77
78pub unsafe extern "C" fn gen_vdp_fini<T: DeliveryProcessor>(
79    _: *mut vdp_ctx,
80    priv_: *mut *mut c_void,
81) -> c_int {
82    if !priv_.is_null() {
83        assert_ne!(*priv_, ptr::null_mut());
84        drop(Box::from_raw((*priv_).cast::<T>()));
85        *priv_ = ptr::null_mut();
86    }
87
88    0
89}
90
91pub unsafe extern "C" fn gen_vdp_push<T: DeliveryProcessor>(
92    ctx_raw: *mut vdp_ctx,
93    act: VdpAction,
94    priv_: *mut *mut c_void,
95    ptr: *const c_void,
96    len: isize,
97) -> c_int {
98    assert_ne!(priv_, ptr::null_mut());
99    assert_ne!(*priv_, ptr::null_mut());
100    if !matches!(act, VdpAction::Null | VdpAction::Flush | VdpAction::End) {
101        return 1; /* TODO: log */
102    }
103
104    let buf = if ptr.is_null() {
105        &[0; 0]
106    } else {
107        std::slice::from_raw_parts(ptr.cast::<u8>(), len as usize)
108    };
109
110    match (*(*priv_).cast::<T>()).push(&mut DeliveryProcCtx::from_ptr(ctx_raw), act, buf) {
111        PushResult::Err => -1, // TODO: log error
112        PushResult::Ok => 0,
113        PushResult::End => 1,
114    }
115}
116
117/// Create a `ffi::vdp` that can be fed to `ffi::VRT_AddFilter`
118pub fn new_vdp<T: DeliveryProcessor>() -> ffi::vdp {
119    ffi::vdp {
120        name: T::name().as_ptr(),
121        init: Some(gen_vdp_init::<T>),
122        bytes: Some(gen_vdp_push::<T>),
123        fini: Some(gen_vdp_fini::<T>),
124        priv1: ptr::null(),
125        io_init: None,
126        io_upgrade: None,
127        io_lease: None,
128        io_fini: None,
129    }
130}
131
132/// A thin wrapper around a `*mut ffi::vdp_ctx`
133#[derive(Debug)]
134pub struct DeliveryProcCtx<'a> {
135    pub raw: &'a mut vdp_ctx,
136}
137
138impl DeliveryProcCtx<'_> {
139    /// Check the pointer validity and returns the rust equivalent.
140    ///
141    /// # Safety
142    ///
143    /// The caller is in charge of making sure the structure doesn't outlive the pointer.
144    pub(crate) unsafe fn from_ptr(raw: *mut vdp_ctx) -> Self {
145        let raw = raw.as_mut().expect("vdp_ctx pointer must not be null");
146        assert_eq!(raw.magic, ffi::VDP_CTX_MAGIC);
147        Self { raw }
148    }
149
150    /// Send buffer down the pipeline
151    pub fn push(&mut self, act: VdpAction, buf: &[u8]) -> PushResult {
152        match unsafe {
153            ffi::VDP_bytes(
154                self.raw,
155                act,
156                buf.as_ptr().cast::<c_void>(),
157                buf.len() as isize,
158            )
159        } {
160            r if r < 0 => PushResult::Err,
161            0 => PushResult::Ok,
162            _ => PushResult::End,
163        }
164    }
165}
166
167/// Describes a Varnish Fetch Processor (VFP)
168pub trait FetchProcessor: Sized {
169    /// The name of the processor.
170    fn name() -> &'static CStr;
171    /// Create a new processor, possibly using knowledge from the pipeline
172    fn new(vrt_ctx: &mut Ctx, vfp_ctx: &mut FetchProcCtx) -> InitResult<Self>;
173    /// Write data into `buf`, generally using `VFP_Suck` to collect data from the previous
174    /// processor.
175    fn pull(&mut self, ctx: &mut FetchProcCtx, buf: &mut [u8]) -> PullResult;
176}
177
178unsafe extern "C" fn wrap_vfp_init<T: FetchProcessor>(
179    vrt_ctx: *const vrt_ctx,
180    ctxp: *mut vfp_ctx,
181    vfep: *mut vfp_entry,
182) -> VfpStatus {
183    let ctx = validate_vfp_ctx(ctxp);
184    let vfe = validate_vfp_entry(vfep);
185    match T::new(
186        &mut Ctx::from_ptr(vrt_ctx),
187        &mut FetchProcCtx::from_ptr(ctx),
188    ) {
189        InitResult::Ok(proc) => {
190            vfe.priv1 = Box::into_raw(Box::new(proc)).cast::<c_void>();
191            VfpStatus::Ok
192        }
193        InitResult::Err(_) => VfpStatus::Error, // TODO: log the error,
194        InitResult::Pass => VfpStatus::End,
195    }
196}
197
198pub unsafe extern "C" fn wrap_vfp_pull<T: FetchProcessor>(
199    ctxp: *mut vfp_ctx,
200    vfep: *mut vfp_entry,
201    ptr: *mut c_void,
202    len: *mut isize,
203) -> VfpStatus {
204    let ctx = validate_vfp_ctx(ctxp);
205    let vfe = validate_vfp_entry(vfep);
206    let buf = if ptr.is_null() {
207        [0; 0].as_mut()
208    } else {
209        std::slice::from_raw_parts_mut(ptr.cast::<u8>(), *len as usize)
210    };
211    let Some(obj) = vfe.priv1.cast::<T>().as_mut() else {
212        // Avoid panicking across the FFI boundary if the invariant is violated.
213        return VfpStatus::Error;
214    };
215    match obj.pull(&mut FetchProcCtx::from_ptr(ctx), buf) {
216        PullResult::Err => VfpStatus::Error, // TODO: log error
217        PullResult::Ok(l) => {
218            *len = l as isize;
219            VfpStatus::Ok
220        }
221        PullResult::End(l) => {
222            *len = l as isize;
223            VfpStatus::End
224        }
225    }
226}
227
228pub unsafe extern "C" fn wrap_vfp_fini<T: FetchProcessor>(
229    ctxp: *mut vfp_ctx,
230    vfep: *mut vfp_entry,
231) {
232    validate_vfp_ctx(ctxp);
233    let vfe = validate_vfp_entry(vfep);
234    if !vfe.priv1.is_null() {
235        let p = ptr::replace(&raw mut vfe.priv1, ptr::null_mut());
236        drop(Box::from_raw(p.cast::<T>()));
237    }
238}
239
240/// Create a `ffi::vfp` that can be fed to `ffi::VRT_AddFilter`
241pub fn new_vfp<T: FetchProcessor>() -> ffi::vfp {
242    ffi::vfp {
243        name: T::name().as_ptr(),
244        init: Some(wrap_vfp_init::<T>),
245        pull: Some(wrap_vfp_pull::<T>),
246        fini: Some(wrap_vfp_fini::<T>),
247        priv1: ptr::null(),
248    }
249}
250
251/// A thin wrapper around a `*mut ffi::vfp_ctx`
252#[derive(Debug)]
253pub struct FetchProcCtx<'a> {
254    pub raw: &'a mut vfp_ctx,
255}
256
257impl FetchProcCtx<'_> {
258    /// Check the pointer validity and returns the rust equivalent.
259    ///
260    /// # Safety
261    ///
262    /// The caller is in charge of making sure the structure doesn't outlive the pointer.
263    pub(crate) unsafe fn from_ptr(raw: *mut vfp_ctx) -> Self {
264        Self {
265            raw: validate_vfp_ctx(raw),
266        }
267    }
268
269    /// Pull data from the pipeline
270    pub fn pull(&mut self, buf: &mut [u8]) -> PullResult {
271        let mut len = buf.len() as isize;
272        let max_len = len;
273
274        match unsafe { ffi::VFP_Suck(self.raw, buf.as_ptr() as *mut c_void, &raw mut len) } {
275            VfpStatus::Ok => {
276                assert!(len <= max_len);
277                assert!(len >= 0);
278                PullResult::Ok(len as usize)
279            }
280            VfpStatus::End => {
281                assert!(len <= max_len);
282                assert!(len >= 0);
283                PullResult::End(len as usize)
284            }
285            VfpStatus::Error => PullResult::Err,
286            VfpStatus::Null => panic!("VFP_Suck() was never supposed to return VFP_NULL!"),
287            // In the future, there might be more enum values, so we should ensure it continues
288            // to compile, but we do want a warning when developing locally to add the new one.
289            #[expect(unreachable_patterns)]
290            n => panic!("unknown VfpStatus {n:?}"),
291        }
292    }
293}
294
295#[derive(Debug)]
296pub struct FetchFilters<'c, 'f> {
297    ctx: &'c vrt_ctx,
298    // The pointer to the box content must be stable.
299    // Storing values directly in the vector might be moved when the vector grows.
300    #[expect(clippy::vec_box)]
301    filters: &'f mut Vec<Box<ffi::vfp>>,
302}
303
304impl<'c, 'f> FetchFilters<'c, 'f> {
305    #[expect(clippy::vec_box)]
306    pub(crate) fn new(ctx: &'c vrt_ctx, filters: &'f mut Vec<Box<ffi::vfp>>) -> Self {
307        Self { ctx, filters }
308    }
309
310    fn find_position<T: FetchProcessor>(&self) -> Option<usize> {
311        let name = T::name().as_ptr();
312        self.filters.iter().position(|f| f.name == name)
313    }
314
315    pub fn register<T: FetchProcessor>(&mut self) -> bool {
316        if self.find_position::<T>().is_none() {
317            let instance = Box::new(new_vfp::<T>());
318            unsafe {
319                ffi::VRT_AddFilter(self.ctx, instance.as_ref(), ptr::null());
320            }
321            self.filters.push(instance);
322            true
323        } else {
324            false
325        }
326    }
327
328    pub fn unregister<T: FetchProcessor>(&mut self) -> bool {
329        if let Some(pos) = self.find_position::<T>() {
330            let filter = self.filters.swap_remove(pos);
331            unsafe {
332                ffi::VRT_RemoveFilter(self.ctx, filter.as_ref(), ptr::null());
333            }
334            true
335        } else {
336            false
337        }
338    }
339
340    pub fn unregister_all(&mut self) {
341        for filter in self.filters.drain(..) {
342            unsafe { ffi::VRT_RemoveFilter(self.ctx, filter.as_ref(), ptr::null()) }
343        }
344    }
345}
346
347#[derive(Debug)]
348pub struct DeliveryFilters<'c, 'f> {
349    ctx: &'c vrt_ctx,
350    // The pointer to the box content must be stable.
351    // Storing values directly in the vector might be moved when the vector grows.
352    #[expect(clippy::vec_box)]
353    filters: &'f mut Vec<Box<ffi::vdp>>,
354}
355
356impl<'c, 'f> DeliveryFilters<'c, 'f> {
357    #[expect(clippy::vec_box)]
358    pub(crate) fn new(ctx: &'c vrt_ctx, filters: &'f mut Vec<Box<ffi::vdp>>) -> Self {
359        Self { ctx, filters }
360    }
361
362    fn find_position<T: DeliveryProcessor>(&self) -> Option<usize> {
363        let name = T::name().as_ptr();
364        self.filters.iter().position(|f| f.name == name)
365    }
366
367    pub fn register<T: DeliveryProcessor>(&mut self) -> bool {
368        if self.find_position::<T>().is_none() {
369            let instance = Box::new(new_vdp::<T>());
370            unsafe {
371                ffi::VRT_AddFilter(self.ctx, ptr::null(), instance.as_ref());
372            }
373            self.filters.push(instance);
374            true
375        } else {
376            false
377        }
378    }
379
380    pub fn unregister<T: DeliveryProcessor>(&mut self) -> bool {
381        if let Some(pos) = self.find_position::<T>() {
382            let filter = self.filters.swap_remove(pos);
383            unsafe {
384                ffi::VRT_RemoveFilter(self.ctx, ptr::null(), filter.as_ref());
385            }
386            true
387        } else {
388            false
389        }
390    }
391
392    pub fn unregister_all(&mut self) {
393        for filter in self.filters.drain(..) {
394            unsafe { ffi::VRT_RemoveFilter(self.ctx, ptr::null(), filter.as_ref()) }
395        }
396    }
397}