example/
example.rs

1use std::{
2    io::Read,
3    sync::{Arc, Mutex},
4};
5
6use envoy_dynamic_modules_rust_sdk::*;
7use serde::{Deserialize, Serialize};
8
9init!(new_http_filter);
10
11/// new_http_filter is the entry point for the filter chains.
12///
13/// This function is called by the Envoy corresponding to the filter chain configuration.
14fn new_http_filter(config: &str) -> Box<dyn HttpFilter> {
15    // Each filter is written in a way that it passes the conformance tests.
16    match config {
17        "helloworld" => Box::new(HelloWorldFilter {}),
18        "delay" => Box::new(DelayFilter {
19            atomic: std::sync::atomic::AtomicUsize::new(1),
20        }),
21        "headers" => Box::new(HeadersFilter {}),
22        "bodies" => Box::new(BodiesFilter {}),
23        "bodies_replace" => Box::new(BodiesReplace {}),
24        "send_response" => Box::new(SendResponseFilter {}),
25        "validate_json" => Box::new(ValidateJsonFilter {}),
26        _ => panic!("Unknown config: {}", config),
27    }
28}
29
30/// HelloWorldFilter is a simple filter that prints a message for each filter call.
31///
32/// This implements the [`HttpFilter`] trait, and will be craeted per each filter chain.
33struct HelloWorldFilter {}
34
35impl HttpFilter for HelloWorldFilter {
36    fn new_instance(
37        &mut self,
38        _envoy_filter_instance: EnvoyFilterInstance,
39    ) -> Box<dyn HttpFilterInstance> {
40        Box::new(HelloWorldFilterInstance {})
41    }
42
43    fn destroy(&self) {
44        println!("HelloWorldFilter destroyed");
45    }
46}
47
48/// HelloWorldFilterInstance is a simple filter instance that prints a message for each filter call.
49///
50/// This implements the [`HttpFilterInstance`] trait, and will be created per each request.
51struct HelloWorldFilterInstance {}
52
53impl HttpFilterInstance for HelloWorldFilterInstance {
54    fn request_headers(
55        &mut self,
56        _request_headers: &RequestHeaders,
57        _end_of_stream: bool,
58    ) -> RequestHeadersStatus {
59        println!("RequestHeaders called");
60        RequestHeadersStatus::Continue
61    }
62
63    fn request_body(
64        &mut self,
65        _request_body_frame: &RequestBodyBuffer,
66        _end_of_stream: bool,
67    ) -> RequestBodyStatus {
68        println!("RequestBody called");
69        RequestBodyStatus::Continue
70    }
71
72    fn response_headers(
73        &mut self,
74        _response_headers: &ResponseHeaders,
75        _end_of_stream: bool,
76    ) -> ResponseHeadersStatus {
77        println!("ResponseHeaders called");
78        ResponseHeadersStatus::Continue
79    }
80
81    fn response_body(
82        &mut self,
83        _response_body_frame: &ResponseBodyBuffer,
84        _end_of_stream: bool,
85    ) -> ResponseBodyStatus {
86        println!("ResponseBody called");
87        ResponseBodyStatus::Continue
88    }
89
90    fn destroy(&mut self) {
91        println!("Destroy called");
92    }
93}
94
95/// HeadersFilter is a filter that manipulates headers.
96///
97/// This implements the [`HttpFilter`] trait, and will be created per each filter chain.
98struct HeadersFilter {}
99
100impl HttpFilter for HeadersFilter {
101    fn new_instance(
102        &mut self,
103        _envoy_filter_instance: EnvoyFilterInstance,
104    ) -> Box<dyn HttpFilterInstance> {
105        Box::new(HeadersFilterInstance {})
106    }
107}
108
109/// HeadersFilterInstance is a filter instance that manipulates headers.
110///
111/// This implements the [`HttpFilterInstance`] trait, and will be created per each request.
112struct HeadersFilterInstance {}
113
114impl HttpFilterInstance for HeadersFilterInstance {
115    fn request_headers(
116        &mut self,
117        request_headers: &RequestHeaders,
118        _end_of_stream: bool,
119    ) -> RequestHeadersStatus {
120        if let Some(value) = request_headers.get(b"foo") {
121            if value != b"value" {
122                panic!(
123                    "expected this-is to be \"value\", got {:?}",
124                    std::str::from_utf8(value).unwrap()
125                );
126            } else {
127                println!("foo: {}", std::str::from_utf8(value).unwrap());
128            }
129        }
130
131        request_headers
132            .values(b"multiple-values")
133            .iter()
134            .for_each(|value| {
135                println!("multiple-values: {}", std::str::from_utf8(value).unwrap());
136            });
137
138        request_headers.remove(b"multiple-values");
139        request_headers.set(b"foo", b"yes");
140        request_headers.set(b"multiple-values-to-be-single", b"single");
141        RequestHeadersStatus::Continue
142    }
143
144    fn response_headers(
145        &mut self,
146        response_headers: &ResponseHeaders,
147        _end_of_stream: bool,
148    ) -> ResponseHeadersStatus {
149        if let Some(value) = response_headers.get(b"this-is") {
150            if value != b"response-header" {
151                panic!(
152                    "expected this-is to be \"response-header\", got {:?}",
153                    value
154                );
155            } else {
156                println!("this-is: {}", std::str::from_utf8(value).unwrap());
157            }
158        }
159
160        response_headers
161            .values(b"this-is-2")
162            .iter()
163            .for_each(|value| {
164                println!("this-is-2: {}", std::str::from_utf8(value).unwrap());
165            });
166
167        response_headers.remove(b"this-is-2");
168        response_headers.set(b"this-is", b"response-header");
169        response_headers.set(b"multiple-values-res-to-be-single", b"single");
170
171        ResponseHeadersStatus::Continue
172    }
173}
174
175/// DelayFilter is a filter that delays the request.
176///
177/// This implements the [`HttpFilter`] trait, and will be created per each filter chain.
178struct DelayFilter {
179    atomic: std::sync::atomic::AtomicUsize,
180}
181
182impl HttpFilter for DelayFilter {
183    fn new_instance(
184        &mut self,
185        envoy_filter_instance: EnvoyFilterInstance,
186    ) -> Box<dyn HttpFilterInstance> {
187        let req_no = self
188            .atomic
189            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
190
191        let envoy_filter_instance = Arc::new(Mutex::new(Some(envoy_filter_instance)));
192        Box::new(DelayFilterInstance {
193            req_no,
194            envoy_filter_instance,
195        })
196    }
197}
198
199/// DelayFilterInstance is a filter instance that delays the request.
200///
201/// This implements the [`HttpFilterInstance`] trait, and will be created per each request.
202struct DelayFilterInstance {
203    req_no: usize,
204    envoy_filter_instance: Arc<Mutex<Option<EnvoyFilterInstance>>>,
205}
206
207impl HttpFilterInstance for DelayFilterInstance {
208    fn request_headers(
209        &mut self,
210        _request_headers: &RequestHeaders,
211        _end_of_stream: bool,
212    ) -> RequestHeadersStatus {
213        if self.req_no == 1 {
214            let envoy_filter_instance = self.envoy_filter_instance.clone();
215            let req_no = self.req_no;
216            std::thread::spawn(move || {
217                println!("blocking for 1 second at RequestHeaders with id {}", req_no);
218                std::thread::sleep(std::time::Duration::from_secs(1));
219                println!("calling ContinueRequest with id {}", req_no);
220                if let Some(envoy_filter_instance) = envoy_filter_instance.lock().unwrap().as_ref()
221                {
222                    envoy_filter_instance.continue_request();
223                }
224            });
225            println!(
226                "RequestHeaders returning StopAllIterationAndBuffer with id {}",
227                self.req_no
228            );
229            RequestHeadersStatus::StopAllIterationAndBuffer
230        } else {
231            println!("RequestHeaders called with id {}", self.req_no);
232            RequestHeadersStatus::Continue
233        }
234    }
235
236    fn request_body(
237        &mut self,
238        _request_body_frame: &RequestBodyBuffer,
239        _end_of_stream: bool,
240    ) -> RequestBodyStatus {
241        if self.req_no == 2 {
242            let envoy_filter_instance = self.envoy_filter_instance.clone();
243            let req_no = self.req_no;
244            std::thread::spawn(move || {
245                println!("blocking for 1 second at RequestBody with id {}", req_no);
246                std::thread::sleep(std::time::Duration::from_secs(1));
247                println!("calling ContinueRequest with id {}", req_no);
248                if let Some(envoy_filter_instance) = envoy_filter_instance.lock().unwrap().as_ref()
249                {
250                    envoy_filter_instance.continue_request();
251                }
252            });
253            println!(
254                "RequestBody returning StopIterationAndBuffer with id {}",
255                self.req_no
256            );
257            RequestBodyStatus::StopIterationAndBuffer
258        } else {
259            println!("RequestBody called with id {}", self.req_no);
260            RequestBodyStatus::Continue
261        }
262    }
263
264    fn response_headers(
265        &mut self,
266        _response_headers: &ResponseHeaders,
267        _end_of_stream: bool,
268    ) -> ResponseHeadersStatus {
269        if self.req_no == 3 {
270            let envoy_filter_instance = self.envoy_filter_instance.clone();
271            let req_no = self.req_no;
272            std::thread::spawn(move || {
273                println!(
274                    "blocking for 1 second at ResponseHeaders with id {}",
275                    req_no
276                );
277                std::thread::sleep(std::time::Duration::from_secs(1));
278                println!("calling ContinueResponse with id {}", req_no);
279                if let Some(envoy_filter_instance) = envoy_filter_instance.lock().unwrap().as_ref()
280                {
281                    envoy_filter_instance.continue_response();
282                }
283            });
284            println!(
285                "ResponseHeaders returning StopAllIterationAndBuffer with id {}",
286                self.req_no
287            );
288
289            ResponseHeadersStatus::StopAllIterationAndBuffer
290        } else {
291            println!("ResponseHeaders called with id {}", self.req_no);
292            ResponseHeadersStatus::Continue
293        }
294    }
295
296    fn response_body(
297        &mut self,
298        _response_body_frame: &ResponseBodyBuffer,
299        _end_of_stream: bool,
300    ) -> ResponseBodyStatus {
301        if self.req_no == 4 {
302            let envoy_filter_instance = self.envoy_filter_instance.clone();
303            let req_no = self.req_no;
304            std::thread::spawn(move || {
305                println!("blocking for 1 second at ResponseBody with id {}", req_no);
306                std::thread::sleep(std::time::Duration::from_secs(1));
307                println!("calling ContinueResponse with id {}", req_no);
308                if let Some(envoy_filter_instance) = envoy_filter_instance.lock().unwrap().as_ref()
309                {
310                    envoy_filter_instance.continue_response();
311                }
312            });
313            println!(
314                "ResponseBody returning StopIterationAndBuffer with id {}",
315                self.req_no
316            );
317
318            ResponseBodyStatus::StopIterationAndBuffer
319        } else {
320            println!("ResponseBody called with id {}", self.req_no);
321            ResponseBodyStatus::Continue
322        }
323    }
324
325    fn destroy(&mut self) {
326        *self.envoy_filter_instance.lock().unwrap() = None;
327    }
328}
329
330/// BodyFilter is a filter that manipulates request/response bodies.
331///
332/// This implements the [`HttpFilter`] trait, and will be created per each filter chain.
333struct BodiesFilter {}
334
335impl HttpFilter for BodiesFilter {
336    fn new_instance(
337        &mut self,
338        envoy_filter_instance: EnvoyFilterInstance,
339    ) -> Box<dyn HttpFilterInstance> {
340        Box::new(BodiesFilterInstance {
341            envoy_filter_instance,
342        })
343    }
344}
345
346/// BodiesFilterInstance is a filter instance that manipulates request/response bodies.
347///
348/// This implements the [`HttpFilterInstance`] trait, and will be created per each request.
349struct BodiesFilterInstance {
350    envoy_filter_instance: EnvoyFilterInstance,
351}
352
353impl HttpFilterInstance for BodiesFilterInstance {
354    fn request_body(
355        &mut self,
356        request_body_frame: &RequestBodyBuffer,
357        end_of_stream: bool,
358    ) -> RequestBodyStatus {
359        println!(
360            "new request body frame: {}",
361            String::from_utf8(request_body_frame.copy()).unwrap()
362        );
363        if !end_of_stream {
364            // Wait for the end of the stream to see the full body.
365            return RequestBodyStatus::StopIterationAndBuffer;
366        }
367
368        // Get the entire request body reference - this does not copy the body.
369        let entire_body = self.envoy_filter_instance.get_request_body_buffer();
370        println!(
371            "entire request body: {}",
372            String::from_utf8(entire_body.copy()).unwrap()
373        );
374
375        // This demonstrates how to use the reader to read the body.
376        let mut reader = entire_body.reader();
377        let mut buf = vec![0; 2];
378        let mut offset = 0;
379        loop {
380            let n = reader.read(&mut buf).unwrap();
381            if n == 0 {
382                break;
383            }
384            println!(
385                "request body read 2 bytes offset at {}: \"{}\"",
386                offset,
387                std::str::from_utf8(&buf[..n]).unwrap()
388            );
389            offset += 2;
390        }
391
392        // Replace the entire body with 'Y' without copying.
393        for i in entire_body.slices() {
394            for j in i {
395                *j = b'X';
396            }
397        }
398        RequestBodyStatus::Continue
399    }
400
401    fn response_body(
402        &mut self,
403        response_body_frame: &ResponseBodyBuffer,
404        end_of_stream: bool,
405    ) -> ResponseBodyStatus {
406        println!(
407            "new response body frame: {}",
408            String::from_utf8(response_body_frame.copy()).unwrap()
409        );
410        if !end_of_stream {
411            // Wait for the end of the stream to see the full body.
412            return ResponseBodyStatus::StopIterationAndBuffer;
413        }
414
415        // Get the entire response body reference - this does not copy the body.
416        let entire_body = self.envoy_filter_instance.get_response_body_buffer();
417        println!(
418            "entire response body: {}",
419            String::from_utf8(entire_body.copy()).unwrap()
420        );
421
422        // This demonstrates how to use the reader to read the body.
423        let mut reader = entire_body.reader();
424        let mut buf = vec![0; 2];
425        let mut offset = 0;
426        loop {
427            let n = reader.read(&mut buf).unwrap();
428            if n == 0 {
429                break;
430            }
431            println!(
432                "response body read 2 bytes offset at {}: \"{}\"",
433                offset,
434                std::str::from_utf8(&buf[..n]).unwrap()
435            );
436            offset += 2;
437        }
438
439        // Replace the entire body with 'Y' without copying.
440        for i in entire_body.slices() {
441            for j in i {
442                *j = b'Y';
443            }
444        }
445
446        ResponseBodyStatus::Continue
447    }
448}
449
450/// BodiesReplaceFilter is a filter that replaces request/response bodies.
451///
452/// This implements the [`HttpFilter`] trait, and will be created per each filter chain.
453struct BodiesReplace {}
454
455impl HttpFilter for BodiesReplace {
456    fn new_instance(
457        &mut self,
458        envoy_filter_instance: EnvoyFilterInstance,
459    ) -> Box<dyn HttpFilterInstance> {
460        Box::new(BodiesReplaceInstance {
461            envoy_filter_instance,
462            request_append: String::new(),
463            request_prepend: String::new(),
464            request_replace: String::new(),
465            response_append: String::new(),
466            response_prepend: String::new(),
467            response_replace: String::new(),
468        })
469    }
470}
471
472/// BodiesReplaceInstance is a filter instance that replaces request/response bodies.
473///
474/// This implements the [`HttpFilterInstance`] trait, and will be created per each request.
475struct BodiesReplaceInstance {
476    envoy_filter_instance: EnvoyFilterInstance,
477    request_append: String,
478    request_prepend: String,
479    request_replace: String,
480    response_append: String,
481    response_prepend: String,
482    response_replace: String,
483}
484
485impl HttpFilterInstance for BodiesReplaceInstance {
486    fn request_headers(
487        &mut self,
488        request_headers: &RequestHeaders,
489        _end_of_stream: bool,
490    ) -> RequestHeadersStatus {
491        if let Some(value) = request_headers.get(b"append") {
492            self.request_append = std::str::from_utf8(value).unwrap().to_string();
493        }
494        if let Some(value) = request_headers.get(b"prepend") {
495            self.request_prepend = std::str::from_utf8(value).unwrap().to_string();
496        }
497        if let Some(value) = request_headers.get(b"replace") {
498            self.request_replace = std::str::from_utf8(value).unwrap().to_string();
499        }
500        request_headers.remove(b"content-length"); // Remove content-length header to avoid mismatch.
501        RequestHeadersStatus::Continue
502    }
503
504    fn request_body(
505        &mut self,
506        _request_body_frame: &RequestBodyBuffer,
507        end_of_stream: bool,
508    ) -> RequestBodyStatus {
509        if !end_of_stream {
510            // Wait for the end of the stream to see the full body.
511            return RequestBodyStatus::StopIterationAndBuffer;
512        }
513
514        let entire_body = self.envoy_filter_instance.get_request_body_buffer();
515        if !self.request_append.is_empty() {
516            entire_body.append(self.request_append.as_bytes());
517        }
518        if !self.request_prepend.is_empty() {
519            entire_body.prepend(self.request_prepend.as_bytes());
520        }
521        if !self.request_replace.is_empty() {
522            entire_body.replace(self.request_replace.as_bytes());
523        }
524        RequestBodyStatus::Continue
525    }
526
527    fn response_headers(
528        &mut self,
529        response_headers: &ResponseHeaders,
530        _end_of_stream: bool,
531    ) -> ResponseHeadersStatus {
532        if let Some(value) = response_headers.get(b"append") {
533            self.response_append = std::str::from_utf8(value).unwrap().to_string();
534        }
535        if let Some(value) = response_headers.get(b"prepend") {
536            self.response_prepend = std::str::from_utf8(value).unwrap().to_string();
537        }
538        if let Some(value) = response_headers.get(b"replace") {
539            self.response_replace = std::str::from_utf8(value).unwrap().to_string();
540        }
541        response_headers.remove(b"content-length"); // Remove content-length header to avoid mismatch.
542        ResponseHeadersStatus::Continue
543    }
544
545    fn response_body(
546        &mut self,
547        _response_body_frame: &ResponseBodyBuffer,
548        end_of_stream: bool,
549    ) -> ResponseBodyStatus {
550        if !end_of_stream {
551            // Wait for the end of the stream to see the full body.
552            return ResponseBodyStatus::StopIterationAndBuffer;
553        }
554
555        let entire_body = self.envoy_filter_instance.get_response_body_buffer();
556        if !self.response_append.is_empty() {
557            entire_body.append(self.response_append.as_bytes());
558        }
559        if !self.response_prepend.is_empty() {
560            entire_body.prepend(self.response_prepend.as_bytes());
561        }
562        if !self.response_replace.is_empty() {
563            entire_body.replace(self.response_replace.as_bytes());
564        }
565        ResponseBodyStatus::Continue
566    }
567}
568
569/// SendResponseFilter is a filter that sends a response.
570///
571/// This implements the [`HttpFilter`] trait, and will be created per each filter chain.
572struct SendResponseFilter {}
573
574impl HttpFilter for SendResponseFilter {
575    fn new_instance(
576        &mut self,
577        envoy_filter_instance: EnvoyFilterInstance,
578    ) -> Box<dyn HttpFilterInstance> {
579        Box::new(SendResponseFilterInstance {
580            envoy_filter_instance,
581            on_response_headers: false,
582        })
583    }
584}
585
586/// SendResponseFilterInstance is a filter instance that sends a response.
587///
588/// This implements the [`HttpFilterInstance`] trait, and will be created per each request.
589struct SendResponseFilterInstance {
590    envoy_filter_instance: EnvoyFilterInstance,
591    on_response_headers: bool,
592}
593
594impl HttpFilterInstance for SendResponseFilterInstance {
595    fn request_headers(
596        &mut self,
597        request_headers: &RequestHeaders,
598        _end_of_stream: bool,
599    ) -> RequestHeadersStatus {
600        if let Some(value) = request_headers.get(b":path") {
601            if value == "/on_request".as_bytes() {
602                let headers: Vec<(&[u8], &[u8])> = vec![
603                    ("foo".as_bytes(), "bar".as_bytes()),
604                    ("bar".as_bytes(), "baz".as_bytes()),
605                ];
606                self.envoy_filter_instance.send_response(
607                    401,
608                    headers.as_slice(),
609                    "local response at request headers".as_bytes(),
610                );
611            }
612            if value == b"/on_response" {
613                self.on_response_headers = true;
614            }
615        }
616        RequestHeadersStatus::Continue
617    }
618
619    fn response_headers(
620        &mut self,
621        _response_headers: &ResponseHeaders,
622        _end_of_stream: bool,
623    ) -> ResponseHeadersStatus {
624        if self.on_response_headers {
625            let headers: Vec<(&[u8], &[u8])> = vec![("dog".as_bytes(), "cat".as_bytes())];
626            self.envoy_filter_instance.send_response(
627                500,
628                headers.as_slice(),
629                "local response at response headers".as_bytes(),
630            );
631        }
632        ResponseHeadersStatus::Continue
633    }
634}
635
636/// ValidateJsonFilter is a filter that validates JSON.
637///
638/// This implements the [`HttpFilter`] trait, and will be created per each filter chain.
639struct ValidateJsonFilter {}
640
641impl HttpFilter for ValidateJsonFilter {
642    fn new_instance(
643        &mut self,
644        envoy_filter_instance: EnvoyFilterInstance,
645    ) -> Box<dyn HttpFilterInstance> {
646        Box::new(ValidateJsonFilterInstance {
647            envoy_filter_instance,
648        })
649    }
650}
651
652/// ValidateJsonFilterInstance is a filter instance that validates JSON.
653///
654/// This implements the [`HttpFilterInstance`] trait, and will be created per each request.
655struct ValidateJsonFilterInstance {
656    envoy_filter_instance: EnvoyFilterInstance,
657}
658
659#[derive(Serialize, Deserialize)]
660struct ValidateJsonFilterBody {
661    foo: String,
662}
663
664impl HttpFilterInstance for ValidateJsonFilterInstance {
665    fn request_body(
666        &mut self,
667        _request_body_frame: &RequestBodyBuffer,
668        end_of_stream: bool,
669    ) -> RequestBodyStatus {
670        if !end_of_stream {
671            // Wait for the end of the stream to see the full body.
672            return RequestBodyStatus::StopIterationAndBuffer;
673        }
674
675        let reader = self
676            .envoy_filter_instance
677            .get_request_body_buffer()
678            .reader();
679
680        match serde_json::from_reader(reader) {
681            Ok(body) => {
682                let _body: ValidateJsonFilterBody = body;
683            }
684            Err(_e) => {
685                self.envoy_filter_instance.send_response(400, &[], &[]);
686            }
687        }
688        RequestBodyStatus::Continue
689    }
690}