1use std::{
2 io::Read,
3 sync::{Arc, Mutex},
4};
5
6use envoy_dynamic_modules_rust_sdk::*;
7use serde::{Deserialize, Serialize};
8
9init!(new_http_filter);
10
11fn new_http_filter(config: &str) -> Box<dyn HttpFilter> {
15 match config {
17 "helloworld" => Box::new(HelloWorldFilter {}),
18 "delay" => Box::new(DelayFilter {
19 atomic: std::sync::atomic::AtomicUsize::new(1),
20 }),
21 "headers" => Box::new(HeadersFilter {}),
22 "bodies" => Box::new(BodiesFilter {}),
23 "bodies_replace" => Box::new(BodiesReplace {}),
24 "send_response" => Box::new(SendResponseFilter {}),
25 "validate_json" => Box::new(ValidateJsonFilter {}),
26 _ => panic!("Unknown config: {}", config),
27 }
28}
29
30struct HelloWorldFilter {}
34
35impl HttpFilter for HelloWorldFilter {
36 fn new_instance(
37 &mut self,
38 _envoy_filter_instance: EnvoyFilterInstance,
39 ) -> Box<dyn HttpFilterInstance> {
40 Box::new(HelloWorldFilterInstance {})
41 }
42
43 fn destroy(&self) {
44 println!("HelloWorldFilter destroyed");
45 }
46}
47
48struct HelloWorldFilterInstance {}
52
53impl HttpFilterInstance for HelloWorldFilterInstance {
54 fn request_headers(
55 &mut self,
56 _request_headers: &RequestHeaders,
57 _end_of_stream: bool,
58 ) -> RequestHeadersStatus {
59 println!("RequestHeaders called");
60 RequestHeadersStatus::Continue
61 }
62
63 fn request_body(
64 &mut self,
65 _request_body_frame: &RequestBodyBuffer,
66 _end_of_stream: bool,
67 ) -> RequestBodyStatus {
68 println!("RequestBody called");
69 RequestBodyStatus::Continue
70 }
71
72 fn response_headers(
73 &mut self,
74 _response_headers: &ResponseHeaders,
75 _end_of_stream: bool,
76 ) -> ResponseHeadersStatus {
77 println!("ResponseHeaders called");
78 ResponseHeadersStatus::Continue
79 }
80
81 fn response_body(
82 &mut self,
83 _response_body_frame: &ResponseBodyBuffer,
84 _end_of_stream: bool,
85 ) -> ResponseBodyStatus {
86 println!("ResponseBody called");
87 ResponseBodyStatus::Continue
88 }
89
90 fn destroy(&mut self) {
91 println!("Destroy called");
92 }
93}
94
95struct HeadersFilter {}
99
100impl HttpFilter for HeadersFilter {
101 fn new_instance(
102 &mut self,
103 _envoy_filter_instance: EnvoyFilterInstance,
104 ) -> Box<dyn HttpFilterInstance> {
105 Box::new(HeadersFilterInstance {})
106 }
107}
108
109struct HeadersFilterInstance {}
113
114impl HttpFilterInstance for HeadersFilterInstance {
115 fn request_headers(
116 &mut self,
117 request_headers: &RequestHeaders,
118 _end_of_stream: bool,
119 ) -> RequestHeadersStatus {
120 if let Some(value) = request_headers.get(b"foo") {
121 if value != b"value" {
122 panic!(
123 "expected this-is to be \"value\", got {:?}",
124 std::str::from_utf8(value).unwrap()
125 );
126 } else {
127 println!("foo: {}", std::str::from_utf8(value).unwrap());
128 }
129 }
130
131 request_headers
132 .values(b"multiple-values")
133 .iter()
134 .for_each(|value| {
135 println!("multiple-values: {}", std::str::from_utf8(value).unwrap());
136 });
137
138 request_headers.remove(b"multiple-values");
139 request_headers.set(b"foo", b"yes");
140 request_headers.set(b"multiple-values-to-be-single", b"single");
141 RequestHeadersStatus::Continue
142 }
143
144 fn response_headers(
145 &mut self,
146 response_headers: &ResponseHeaders,
147 _end_of_stream: bool,
148 ) -> ResponseHeadersStatus {
149 if let Some(value) = response_headers.get(b"this-is") {
150 if value != b"response-header" {
151 panic!(
152 "expected this-is to be \"response-header\", got {:?}",
153 value
154 );
155 } else {
156 println!("this-is: {}", std::str::from_utf8(value).unwrap());
157 }
158 }
159
160 response_headers
161 .values(b"this-is-2")
162 .iter()
163 .for_each(|value| {
164 println!("this-is-2: {}", std::str::from_utf8(value).unwrap());
165 });
166
167 response_headers.remove(b"this-is-2");
168 response_headers.set(b"this-is", b"response-header");
169 response_headers.set(b"multiple-values-res-to-be-single", b"single");
170
171 ResponseHeadersStatus::Continue
172 }
173}
174
175struct DelayFilter {
179 atomic: std::sync::atomic::AtomicUsize,
180}
181
182impl HttpFilter for DelayFilter {
183 fn new_instance(
184 &mut self,
185 envoy_filter_instance: EnvoyFilterInstance,
186 ) -> Box<dyn HttpFilterInstance> {
187 let req_no = self
188 .atomic
189 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
190
191 let envoy_filter_instance = Arc::new(Mutex::new(Some(envoy_filter_instance)));
192 Box::new(DelayFilterInstance {
193 req_no,
194 envoy_filter_instance,
195 })
196 }
197}
198
199struct DelayFilterInstance {
203 req_no: usize,
204 envoy_filter_instance: Arc<Mutex<Option<EnvoyFilterInstance>>>,
205}
206
207impl HttpFilterInstance for DelayFilterInstance {
208 fn request_headers(
209 &mut self,
210 _request_headers: &RequestHeaders,
211 _end_of_stream: bool,
212 ) -> RequestHeadersStatus {
213 if self.req_no == 1 {
214 let envoy_filter_instance = self.envoy_filter_instance.clone();
215 let req_no = self.req_no;
216 std::thread::spawn(move || {
217 println!("blocking for 1 second at RequestHeaders with id {}", req_no);
218 std::thread::sleep(std::time::Duration::from_secs(1));
219 println!("calling ContinueRequest with id {}", req_no);
220 if let Some(envoy_filter_instance) = envoy_filter_instance.lock().unwrap().as_ref()
221 {
222 envoy_filter_instance.continue_request();
223 }
224 });
225 println!(
226 "RequestHeaders returning StopAllIterationAndBuffer with id {}",
227 self.req_no
228 );
229 RequestHeadersStatus::StopAllIterationAndBuffer
230 } else {
231 println!("RequestHeaders called with id {}", self.req_no);
232 RequestHeadersStatus::Continue
233 }
234 }
235
236 fn request_body(
237 &mut self,
238 _request_body_frame: &RequestBodyBuffer,
239 _end_of_stream: bool,
240 ) -> RequestBodyStatus {
241 if self.req_no == 2 {
242 let envoy_filter_instance = self.envoy_filter_instance.clone();
243 let req_no = self.req_no;
244 std::thread::spawn(move || {
245 println!("blocking for 1 second at RequestBody with id {}", req_no);
246 std::thread::sleep(std::time::Duration::from_secs(1));
247 println!("calling ContinueRequest with id {}", req_no);
248 if let Some(envoy_filter_instance) = envoy_filter_instance.lock().unwrap().as_ref()
249 {
250 envoy_filter_instance.continue_request();
251 }
252 });
253 println!(
254 "RequestBody returning StopIterationAndBuffer with id {}",
255 self.req_no
256 );
257 RequestBodyStatus::StopIterationAndBuffer
258 } else {
259 println!("RequestBody called with id {}", self.req_no);
260 RequestBodyStatus::Continue
261 }
262 }
263
264 fn response_headers(
265 &mut self,
266 _response_headers: &ResponseHeaders,
267 _end_of_stream: bool,
268 ) -> ResponseHeadersStatus {
269 if self.req_no == 3 {
270 let envoy_filter_instance = self.envoy_filter_instance.clone();
271 let req_no = self.req_no;
272 std::thread::spawn(move || {
273 println!(
274 "blocking for 1 second at ResponseHeaders with id {}",
275 req_no
276 );
277 std::thread::sleep(std::time::Duration::from_secs(1));
278 println!("calling ContinueResponse with id {}", req_no);
279 if let Some(envoy_filter_instance) = envoy_filter_instance.lock().unwrap().as_ref()
280 {
281 envoy_filter_instance.continue_response();
282 }
283 });
284 println!(
285 "ResponseHeaders returning StopAllIterationAndBuffer with id {}",
286 self.req_no
287 );
288
289 ResponseHeadersStatus::StopAllIterationAndBuffer
290 } else {
291 println!("ResponseHeaders called with id {}", self.req_no);
292 ResponseHeadersStatus::Continue
293 }
294 }
295
296 fn response_body(
297 &mut self,
298 _response_body_frame: &ResponseBodyBuffer,
299 _end_of_stream: bool,
300 ) -> ResponseBodyStatus {
301 if self.req_no == 4 {
302 let envoy_filter_instance = self.envoy_filter_instance.clone();
303 let req_no = self.req_no;
304 std::thread::spawn(move || {
305 println!("blocking for 1 second at ResponseBody with id {}", req_no);
306 std::thread::sleep(std::time::Duration::from_secs(1));
307 println!("calling ContinueResponse with id {}", req_no);
308 if let Some(envoy_filter_instance) = envoy_filter_instance.lock().unwrap().as_ref()
309 {
310 envoy_filter_instance.continue_response();
311 }
312 });
313 println!(
314 "ResponseBody returning StopIterationAndBuffer with id {}",
315 self.req_no
316 );
317
318 ResponseBodyStatus::StopIterationAndBuffer
319 } else {
320 println!("ResponseBody called with id {}", self.req_no);
321 ResponseBodyStatus::Continue
322 }
323 }
324
325 fn destroy(&mut self) {
326 *self.envoy_filter_instance.lock().unwrap() = None;
327 }
328}
329
330struct BodiesFilter {}
334
335impl HttpFilter for BodiesFilter {
336 fn new_instance(
337 &mut self,
338 envoy_filter_instance: EnvoyFilterInstance,
339 ) -> Box<dyn HttpFilterInstance> {
340 Box::new(BodiesFilterInstance {
341 envoy_filter_instance,
342 })
343 }
344}
345
346struct BodiesFilterInstance {
350 envoy_filter_instance: EnvoyFilterInstance,
351}
352
353impl HttpFilterInstance for BodiesFilterInstance {
354 fn request_body(
355 &mut self,
356 request_body_frame: &RequestBodyBuffer,
357 end_of_stream: bool,
358 ) -> RequestBodyStatus {
359 println!(
360 "new request body frame: {}",
361 String::from_utf8(request_body_frame.copy()).unwrap()
362 );
363 if !end_of_stream {
364 return RequestBodyStatus::StopIterationAndBuffer;
366 }
367
368 let entire_body = self.envoy_filter_instance.get_request_body_buffer();
370 println!(
371 "entire request body: {}",
372 String::from_utf8(entire_body.copy()).unwrap()
373 );
374
375 let mut reader = entire_body.reader();
377 let mut buf = vec![0; 2];
378 let mut offset = 0;
379 loop {
380 let n = reader.read(&mut buf).unwrap();
381 if n == 0 {
382 break;
383 }
384 println!(
385 "request body read 2 bytes offset at {}: \"{}\"",
386 offset,
387 std::str::from_utf8(&buf[..n]).unwrap()
388 );
389 offset += 2;
390 }
391
392 for i in entire_body.slices() {
394 for j in i {
395 *j = b'X';
396 }
397 }
398 RequestBodyStatus::Continue
399 }
400
401 fn response_body(
402 &mut self,
403 response_body_frame: &ResponseBodyBuffer,
404 end_of_stream: bool,
405 ) -> ResponseBodyStatus {
406 println!(
407 "new response body frame: {}",
408 String::from_utf8(response_body_frame.copy()).unwrap()
409 );
410 if !end_of_stream {
411 return ResponseBodyStatus::StopIterationAndBuffer;
413 }
414
415 let entire_body = self.envoy_filter_instance.get_response_body_buffer();
417 println!(
418 "entire response body: {}",
419 String::from_utf8(entire_body.copy()).unwrap()
420 );
421
422 let mut reader = entire_body.reader();
424 let mut buf = vec![0; 2];
425 let mut offset = 0;
426 loop {
427 let n = reader.read(&mut buf).unwrap();
428 if n == 0 {
429 break;
430 }
431 println!(
432 "response body read 2 bytes offset at {}: \"{}\"",
433 offset,
434 std::str::from_utf8(&buf[..n]).unwrap()
435 );
436 offset += 2;
437 }
438
439 for i in entire_body.slices() {
441 for j in i {
442 *j = b'Y';
443 }
444 }
445
446 ResponseBodyStatus::Continue
447 }
448}
449
450struct BodiesReplace {}
454
455impl HttpFilter for BodiesReplace {
456 fn new_instance(
457 &mut self,
458 envoy_filter_instance: EnvoyFilterInstance,
459 ) -> Box<dyn HttpFilterInstance> {
460 Box::new(BodiesReplaceInstance {
461 envoy_filter_instance,
462 request_append: String::new(),
463 request_prepend: String::new(),
464 request_replace: String::new(),
465 response_append: String::new(),
466 response_prepend: String::new(),
467 response_replace: String::new(),
468 })
469 }
470}
471
472struct BodiesReplaceInstance {
476 envoy_filter_instance: EnvoyFilterInstance,
477 request_append: String,
478 request_prepend: String,
479 request_replace: String,
480 response_append: String,
481 response_prepend: String,
482 response_replace: String,
483}
484
485impl HttpFilterInstance for BodiesReplaceInstance {
486 fn request_headers(
487 &mut self,
488 request_headers: &RequestHeaders,
489 _end_of_stream: bool,
490 ) -> RequestHeadersStatus {
491 if let Some(value) = request_headers.get(b"append") {
492 self.request_append = std::str::from_utf8(value).unwrap().to_string();
493 }
494 if let Some(value) = request_headers.get(b"prepend") {
495 self.request_prepend = std::str::from_utf8(value).unwrap().to_string();
496 }
497 if let Some(value) = request_headers.get(b"replace") {
498 self.request_replace = std::str::from_utf8(value).unwrap().to_string();
499 }
500 request_headers.remove(b"content-length"); RequestHeadersStatus::Continue
502 }
503
504 fn request_body(
505 &mut self,
506 _request_body_frame: &RequestBodyBuffer,
507 end_of_stream: bool,
508 ) -> RequestBodyStatus {
509 if !end_of_stream {
510 return RequestBodyStatus::StopIterationAndBuffer;
512 }
513
514 let entire_body = self.envoy_filter_instance.get_request_body_buffer();
515 if !self.request_append.is_empty() {
516 entire_body.append(self.request_append.as_bytes());
517 }
518 if !self.request_prepend.is_empty() {
519 entire_body.prepend(self.request_prepend.as_bytes());
520 }
521 if !self.request_replace.is_empty() {
522 entire_body.replace(self.request_replace.as_bytes());
523 }
524 RequestBodyStatus::Continue
525 }
526
527 fn response_headers(
528 &mut self,
529 response_headers: &ResponseHeaders,
530 _end_of_stream: bool,
531 ) -> ResponseHeadersStatus {
532 if let Some(value) = response_headers.get(b"append") {
533 self.response_append = std::str::from_utf8(value).unwrap().to_string();
534 }
535 if let Some(value) = response_headers.get(b"prepend") {
536 self.response_prepend = std::str::from_utf8(value).unwrap().to_string();
537 }
538 if let Some(value) = response_headers.get(b"replace") {
539 self.response_replace = std::str::from_utf8(value).unwrap().to_string();
540 }
541 response_headers.remove(b"content-length"); ResponseHeadersStatus::Continue
543 }
544
545 fn response_body(
546 &mut self,
547 _response_body_frame: &ResponseBodyBuffer,
548 end_of_stream: bool,
549 ) -> ResponseBodyStatus {
550 if !end_of_stream {
551 return ResponseBodyStatus::StopIterationAndBuffer;
553 }
554
555 let entire_body = self.envoy_filter_instance.get_response_body_buffer();
556 if !self.response_append.is_empty() {
557 entire_body.append(self.response_append.as_bytes());
558 }
559 if !self.response_prepend.is_empty() {
560 entire_body.prepend(self.response_prepend.as_bytes());
561 }
562 if !self.response_replace.is_empty() {
563 entire_body.replace(self.response_replace.as_bytes());
564 }
565 ResponseBodyStatus::Continue
566 }
567}
568
569struct SendResponseFilter {}
573
574impl HttpFilter for SendResponseFilter {
575 fn new_instance(
576 &mut self,
577 envoy_filter_instance: EnvoyFilterInstance,
578 ) -> Box<dyn HttpFilterInstance> {
579 Box::new(SendResponseFilterInstance {
580 envoy_filter_instance,
581 on_response_headers: false,
582 })
583 }
584}
585
586struct SendResponseFilterInstance {
590 envoy_filter_instance: EnvoyFilterInstance,
591 on_response_headers: bool,
592}
593
594impl HttpFilterInstance for SendResponseFilterInstance {
595 fn request_headers(
596 &mut self,
597 request_headers: &RequestHeaders,
598 _end_of_stream: bool,
599 ) -> RequestHeadersStatus {
600 if let Some(value) = request_headers.get(b":path") {
601 if value == "/on_request".as_bytes() {
602 let headers: Vec<(&[u8], &[u8])> = vec![
603 ("foo".as_bytes(), "bar".as_bytes()),
604 ("bar".as_bytes(), "baz".as_bytes()),
605 ];
606 self.envoy_filter_instance.send_response(
607 401,
608 headers.as_slice(),
609 "local response at request headers".as_bytes(),
610 );
611 }
612 if value == b"/on_response" {
613 self.on_response_headers = true;
614 }
615 }
616 RequestHeadersStatus::Continue
617 }
618
619 fn response_headers(
620 &mut self,
621 _response_headers: &ResponseHeaders,
622 _end_of_stream: bool,
623 ) -> ResponseHeadersStatus {
624 if self.on_response_headers {
625 let headers: Vec<(&[u8], &[u8])> = vec![("dog".as_bytes(), "cat".as_bytes())];
626 self.envoy_filter_instance.send_response(
627 500,
628 headers.as_slice(),
629 "local response at response headers".as_bytes(),
630 );
631 }
632 ResponseHeadersStatus::Continue
633 }
634}
635
636struct ValidateJsonFilter {}
640
641impl HttpFilter for ValidateJsonFilter {
642 fn new_instance(
643 &mut self,
644 envoy_filter_instance: EnvoyFilterInstance,
645 ) -> Box<dyn HttpFilterInstance> {
646 Box::new(ValidateJsonFilterInstance {
647 envoy_filter_instance,
648 })
649 }
650}
651
652struct ValidateJsonFilterInstance {
656 envoy_filter_instance: EnvoyFilterInstance,
657}
658
659#[derive(Serialize, Deserialize)]
660struct ValidateJsonFilterBody {
661 foo: String,
662}
663
664impl HttpFilterInstance for ValidateJsonFilterInstance {
665 fn request_body(
666 &mut self,
667 _request_body_frame: &RequestBodyBuffer,
668 end_of_stream: bool,
669 ) -> RequestBodyStatus {
670 if !end_of_stream {
671 return RequestBodyStatus::StopIterationAndBuffer;
673 }
674
675 let reader = self
676 .envoy_filter_instance
677 .get_request_body_buffer()
678 .reader();
679
680 match serde_json::from_reader(reader) {
681 Ok(body) => {
682 let _body: ValidateJsonFilterBody = body;
683 }
684 Err(_e) => {
685 self.envoy_filter_instance.send_response(400, &[], &[]);
686 }
687 }
688 RequestBodyStatus::Continue
689 }
690}