proxy_wasm/
dispatcher.rs

1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::hostcalls;
16use crate::traits::*;
17use crate::types::*;
18use hashbrown::HashMap;
19use log::trace;
20use std::cell::{Cell, RefCell};
21
22thread_local! {
23static DISPATCHER: Dispatcher = Dispatcher::new();
24}
25
26pub(crate) fn set_root_context(callback: NewRootContext) {
27    DISPATCHER.with(|dispatcher| dispatcher.set_root_context(callback));
28}
29
30pub(crate) fn set_stream_context(callback: NewStreamContext) {
31    DISPATCHER.with(|dispatcher| dispatcher.set_stream_context(callback));
32}
33
34pub(crate) fn set_http_context(callback: NewHttpContext) {
35    DISPATCHER.with(|dispatcher| dispatcher.set_http_context(callback));
36}
37
38pub(crate) fn register_callout(token_id: u32) {
39    DISPATCHER.with(|dispatcher| dispatcher.register_callout(token_id));
40}
41
42pub(crate) fn register_grpc_callout(token_id: u32) {
43    DISPATCHER.with(|dispatcher| dispatcher.register_grpc_callout(token_id));
44}
45
46pub(crate) fn register_grpc_stream(token_id: u32) {
47    DISPATCHER.with(|dispatcher| dispatcher.register_grpc_stream(token_id));
48}
49
50struct NoopRoot;
51
52impl Context for NoopRoot {}
53impl RootContext for NoopRoot {}
54
55struct Dispatcher {
56    new_root: Cell<Option<NewRootContext>>,
57    roots: RefCell<HashMap<u32, Box<dyn RootContext>>>,
58    new_stream: Cell<Option<NewStreamContext>>,
59    streams: RefCell<HashMap<u32, Box<dyn StreamContext>>>,
60    new_http_stream: Cell<Option<NewHttpContext>>,
61    http_streams: RefCell<HashMap<u32, Box<dyn HttpContext>>>,
62    active_id: Cell<u32>,
63    callouts: RefCell<HashMap<u32, u32>>,
64    grpc_callouts: RefCell<HashMap<u32, u32>>,
65    grpc_streams: RefCell<HashMap<u32, u32>>,
66}
67
68impl Dispatcher {
69    fn new() -> Dispatcher {
70        Dispatcher {
71            new_root: Cell::new(None),
72            roots: RefCell::new(HashMap::new()),
73            new_stream: Cell::new(None),
74            streams: RefCell::new(HashMap::new()),
75            new_http_stream: Cell::new(None),
76            http_streams: RefCell::new(HashMap::new()),
77            active_id: Cell::new(0),
78            callouts: RefCell::new(HashMap::new()),
79            grpc_callouts: RefCell::new(HashMap::new()),
80            grpc_streams: RefCell::new(HashMap::new()),
81        }
82    }
83
84    fn set_root_context(&self, callback: NewRootContext) {
85        self.new_root.set(Some(callback));
86    }
87
88    fn set_stream_context(&self, callback: NewStreamContext) {
89        self.new_stream.set(Some(callback));
90    }
91
92    fn set_http_context(&self, callback: NewHttpContext) {
93        self.new_http_stream.set(Some(callback));
94    }
95
96    fn register_callout(&self, token_id: u32) {
97        if self
98            .callouts
99            .borrow_mut()
100            .insert(token_id, self.active_id.get())
101            .is_some()
102        {
103            panic!("duplicate token_id")
104        }
105    }
106
107    fn register_grpc_stream(&self, token_id: u32) {
108        if self
109            .grpc_streams
110            .borrow_mut()
111            .insert(token_id, self.active_id.get())
112            .is_some()
113        {
114            panic!("duplicate token_id")
115        }
116    }
117
118    fn register_grpc_callout(&self, token_id: u32) {
119        if self
120            .grpc_callouts
121            .borrow_mut()
122            .insert(token_id, self.active_id.get())
123            .is_some()
124        {
125            panic!("duplicate token_id")
126        }
127    }
128
129    fn create_root_context(&self, context_id: u32) {
130        let new_context = match self.new_root.get() {
131            Some(f) => f(context_id),
132            None => Box::new(NoopRoot),
133        };
134        if self
135            .roots
136            .borrow_mut()
137            .insert(context_id, new_context)
138            .is_some()
139        {
140            panic!("duplicate context_id")
141        }
142    }
143
144    fn create_stream_context(&self, context_id: u32, root_context_id: u32) {
145        let new_context = match self.roots.borrow().get(&root_context_id) {
146            Some(root_context) => match self.new_stream.get() {
147                Some(f) => f(context_id, root_context_id),
148                None => match root_context.create_stream_context(context_id) {
149                    Some(stream_context) => stream_context,
150                    None => panic!("create_stream_context returned None"),
151                },
152            },
153            None => panic!("invalid root_context_id"),
154        };
155        if self
156            .streams
157            .borrow_mut()
158            .insert(context_id, new_context)
159            .is_some()
160        {
161            panic!("duplicate context_id")
162        }
163    }
164
165    fn create_http_context(&self, context_id: u32, root_context_id: u32) {
166        let new_context = match self.roots.borrow().get(&root_context_id) {
167            Some(root_context) => match self.new_http_stream.get() {
168                Some(f) => f(context_id, root_context_id),
169                None => match root_context.create_http_context(context_id) {
170                    Some(stream_context) => stream_context,
171                    None => panic!("create_http_context returned None"),
172                },
173            },
174            None => panic!("invalid root_context_id"),
175        };
176        if self
177            .http_streams
178            .borrow_mut()
179            .insert(context_id, new_context)
180            .is_some()
181        {
182            panic!("duplicate context_id")
183        }
184    }
185
186    fn on_create_context(&self, context_id: u32, root_context_id: u32) {
187        if root_context_id == 0 {
188            self.create_root_context(context_id);
189        } else if self.new_http_stream.get().is_some() {
190            self.create_http_context(context_id, root_context_id);
191        } else if self.new_stream.get().is_some() {
192            self.create_stream_context(context_id, root_context_id);
193        } else if let Some(root_context) = self.roots.borrow().get(&root_context_id) {
194            match root_context.get_type() {
195                Some(ContextType::HttpContext) => {
196                    self.create_http_context(context_id, root_context_id)
197                }
198                Some(ContextType::StreamContext) => {
199                    self.create_stream_context(context_id, root_context_id)
200                }
201                None => panic!("missing ContextType on root_context"),
202            }
203        } else {
204            panic!("invalid root_context_id and missing constructors");
205        }
206    }
207
208    fn on_done(&self, context_id: u32) -> bool {
209        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
210            self.active_id.set(context_id);
211            http_stream.on_done()
212        } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
213            self.active_id.set(context_id);
214            stream.on_done()
215        } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
216            self.active_id.set(context_id);
217            root.on_done()
218        } else {
219            panic!("invalid context_id")
220        }
221    }
222
223    fn on_log(&self, context_id: u32) {
224        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
225            self.active_id.set(context_id);
226            http_stream.on_log()
227        } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
228            self.active_id.set(context_id);
229            stream.on_log()
230        } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
231            self.active_id.set(context_id);
232            root.on_log()
233        } else {
234            panic!("invalid context_id")
235        }
236    }
237
238    fn on_delete(&self, context_id: u32) {
239        if !(self.http_streams.borrow_mut().remove(&context_id).is_some()
240            || self.streams.borrow_mut().remove(&context_id).is_some()
241            || self.roots.borrow_mut().remove(&context_id).is_some())
242        {
243            panic!("invalid context_id")
244        }
245    }
246
247    fn on_vm_start(&self, context_id: u32, vm_configuration_size: usize) -> bool {
248        if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
249            self.active_id.set(context_id);
250            root.on_vm_start(vm_configuration_size)
251        } else {
252            panic!("invalid context_id")
253        }
254    }
255
256    fn on_configure(&self, context_id: u32, plugin_configuration_size: usize) -> bool {
257        if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
258            self.active_id.set(context_id);
259            root.on_configure(plugin_configuration_size)
260        } else {
261            panic!("invalid context_id")
262        }
263    }
264
265    fn on_tick(&self, context_id: u32) {
266        if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
267            self.active_id.set(context_id);
268            root.on_tick()
269        } else {
270            panic!("invalid context_id")
271        }
272    }
273
274    fn on_queue_ready(&self, context_id: u32, queue_id: u32) {
275        if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
276            self.active_id.set(context_id);
277            root.on_queue_ready(queue_id)
278        } else {
279            panic!("invalid context_id")
280        }
281    }
282
283    fn on_new_connection(&self, context_id: u32) -> Action {
284        if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
285            self.active_id.set(context_id);
286            stream.on_new_connection()
287        } else {
288            panic!("invalid context_id")
289        }
290    }
291
292    fn on_downstream_data(&self, context_id: u32, data_size: usize, end_of_stream: bool) -> Action {
293        if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
294            self.active_id.set(context_id);
295            stream.on_downstream_data(data_size, end_of_stream)
296        } else {
297            panic!("invalid context_id")
298        }
299    }
300
301    fn on_downstream_close(&self, context_id: u32, peer_type: PeerType) {
302        if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
303            self.active_id.set(context_id);
304            stream.on_downstream_close(peer_type)
305        } else {
306            panic!("invalid context_id")
307        }
308    }
309
310    fn on_upstream_data(&self, context_id: u32, data_size: usize, end_of_stream: bool) -> Action {
311        if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
312            self.active_id.set(context_id);
313            stream.on_upstream_data(data_size, end_of_stream)
314        } else {
315            panic!("invalid context_id")
316        }
317    }
318
319    fn on_upstream_close(&self, context_id: u32, peer_type: PeerType) {
320        if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
321            self.active_id.set(context_id);
322            stream.on_upstream_close(peer_type)
323        } else {
324            panic!("invalid context_id")
325        }
326    }
327
328    fn on_http_request_headers(
329        &self,
330        context_id: u32,
331        num_headers: usize,
332        end_of_stream: bool,
333    ) -> Action {
334        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
335            self.active_id.set(context_id);
336            http_stream.on_http_request_headers(num_headers, end_of_stream)
337        } else {
338            panic!("invalid context_id")
339        }
340    }
341
342    fn on_http_request_body(
343        &self,
344        context_id: u32,
345        body_size: usize,
346        end_of_stream: bool,
347    ) -> Action {
348        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
349            self.active_id.set(context_id);
350            http_stream.on_http_request_body(body_size, end_of_stream)
351        } else {
352            panic!("invalid context_id")
353        }
354    }
355
356    fn on_http_request_trailers(&self, context_id: u32, num_trailers: usize) -> Action {
357        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
358            self.active_id.set(context_id);
359            http_stream.on_http_request_trailers(num_trailers)
360        } else {
361            panic!("invalid context_id")
362        }
363    }
364
365    fn on_http_response_headers(
366        &self,
367        context_id: u32,
368        num_headers: usize,
369        end_of_stream: bool,
370    ) -> Action {
371        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
372            self.active_id.set(context_id);
373            http_stream.on_http_response_headers(num_headers, end_of_stream)
374        } else {
375            panic!("invalid context_id")
376        }
377    }
378
379    fn on_http_response_body(
380        &self,
381        context_id: u32,
382        body_size: usize,
383        end_of_stream: bool,
384    ) -> Action {
385        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
386            self.active_id.set(context_id);
387            http_stream.on_http_response_body(body_size, end_of_stream)
388        } else {
389            panic!("invalid context_id")
390        }
391    }
392
393    fn on_http_response_trailers(&self, context_id: u32, num_trailers: usize) -> Action {
394        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
395            self.active_id.set(context_id);
396            http_stream.on_http_response_trailers(num_trailers)
397        } else {
398            panic!("invalid context_id")
399        }
400    }
401
402    fn on_http_call_response(
403        &self,
404        token_id: u32,
405        num_headers: usize,
406        body_size: usize,
407        num_trailers: usize,
408    ) {
409        let context_id = self
410            .callouts
411            .borrow_mut()
412            .remove(&token_id)
413            .expect("invalid token_id");
414
415        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
416            self.active_id.set(context_id);
417            hostcalls::set_effective_context(context_id).unwrap();
418            http_stream.on_http_call_response(token_id, num_headers, body_size, num_trailers)
419        } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
420            self.active_id.set(context_id);
421            hostcalls::set_effective_context(context_id).unwrap();
422            stream.on_http_call_response(token_id, num_headers, body_size, num_trailers)
423        } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
424            self.active_id.set(context_id);
425            hostcalls::set_effective_context(context_id).unwrap();
426            root.on_http_call_response(token_id, num_headers, body_size, num_trailers)
427        }
428    }
429
430    fn on_grpc_receive_initial_metadata(&self, token_id: u32, headers: u32) {
431        let context_id = match self.grpc_streams.borrow_mut().get(&token_id) {
432            Some(id) => *id,
433            None => {
434                // TODO: change back to a panic once underlying issue is fixed.
435                trace!("on_grpc_receive_initial_metadata: invalid token_id");
436                return;
437            }
438        };
439
440        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
441            self.active_id.set(context_id);
442            hostcalls::set_effective_context(context_id).unwrap();
443            http_stream.on_grpc_stream_initial_metadata(token_id, headers);
444        } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
445            self.active_id.set(context_id);
446            hostcalls::set_effective_context(context_id).unwrap();
447            stream.on_grpc_stream_initial_metadata(token_id, headers);
448        } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
449            self.active_id.set(context_id);
450            hostcalls::set_effective_context(context_id).unwrap();
451            root.on_grpc_stream_initial_metadata(token_id, headers);
452        }
453    }
454
455    fn on_grpc_receive(&self, token_id: u32, response_size: usize) {
456        let context_id = self.grpc_callouts.borrow_mut().remove(&token_id);
457        if let Some(context_id) = context_id {
458            if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
459                self.active_id.set(context_id);
460                hostcalls::set_effective_context(context_id).unwrap();
461                http_stream.on_grpc_call_response(token_id, 0, response_size);
462            } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
463                self.active_id.set(context_id);
464                hostcalls::set_effective_context(context_id).unwrap();
465                stream.on_grpc_call_response(token_id, 0, response_size);
466            } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
467                self.active_id.set(context_id);
468                hostcalls::set_effective_context(context_id).unwrap();
469                root.on_grpc_call_response(token_id, 0, response_size);
470            }
471        } else {
472            let context_id = self.grpc_streams.borrow().get(&token_id).cloned();
473            if let Some(context_id) = context_id {
474                if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
475                    self.active_id.set(context_id);
476                    hostcalls::set_effective_context(context_id).unwrap();
477                    http_stream.on_grpc_stream_message(token_id, response_size);
478                } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
479                    self.active_id.set(context_id);
480                    hostcalls::set_effective_context(context_id).unwrap();
481                    stream.on_grpc_stream_message(token_id, response_size);
482                } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
483                    self.active_id.set(context_id);
484                    hostcalls::set_effective_context(context_id).unwrap();
485                    root.on_grpc_stream_message(token_id, response_size);
486                }
487            } else {
488                // TODO: change back to a panic once underlying issue is fixed.
489                trace!("on_grpc_receive_initial_metadata: invalid token_id");
490            }
491        }
492    }
493
494    fn on_grpc_receive_trailing_metadata(&self, token_id: u32, trailers: u32) {
495        let context_id = match self.grpc_streams.borrow_mut().get(&token_id) {
496            Some(id) => *id,
497            None => {
498                // TODO: change back to a panic once underlying issue is fixed.
499                trace!("on_grpc_receive_trailing_metadata: invalid token_id");
500                return;
501            }
502        };
503
504        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
505            self.active_id.set(context_id);
506            hostcalls::set_effective_context(context_id).unwrap();
507            http_stream.on_grpc_stream_trailing_metadata(token_id, trailers);
508        } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
509            self.active_id.set(context_id);
510            hostcalls::set_effective_context(context_id).unwrap();
511            stream.on_grpc_stream_trailing_metadata(token_id, trailers);
512        } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
513            self.active_id.set(context_id);
514            hostcalls::set_effective_context(context_id).unwrap();
515            root.on_grpc_stream_trailing_metadata(token_id, trailers);
516        }
517    }
518
519    fn on_grpc_close(&self, token_id: u32, status_code: u32) {
520        let context_id = self.grpc_callouts.borrow_mut().remove(&token_id);
521        if let Some(context_id) = context_id {
522            if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
523                self.active_id.set(context_id);
524                hostcalls::set_effective_context(context_id).unwrap();
525                http_stream.on_grpc_call_response(token_id, status_code, 0);
526            } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
527                self.active_id.set(context_id);
528                hostcalls::set_effective_context(context_id).unwrap();
529                stream.on_grpc_call_response(token_id, status_code, 0);
530            } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
531                self.active_id.set(context_id);
532                hostcalls::set_effective_context(context_id).unwrap();
533                root.on_grpc_call_response(token_id, status_code, 0);
534            }
535        } else {
536            let context_id = self.grpc_streams.borrow_mut().remove(&token_id);
537            if let Some(context_id) = context_id {
538                if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
539                    self.active_id.set(context_id);
540                    hostcalls::set_effective_context(context_id).unwrap();
541                    http_stream.on_grpc_stream_close(token_id, status_code)
542                } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
543                    self.active_id.set(context_id);
544                    hostcalls::set_effective_context(context_id).unwrap();
545                    stream.on_grpc_stream_close(token_id, status_code)
546                } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
547                    self.active_id.set(context_id);
548                    hostcalls::set_effective_context(context_id).unwrap();
549                    root.on_grpc_stream_close(token_id, status_code)
550                }
551            } else {
552                // TODO: change back to a panic once underlying issue is fixed.
553                trace!("on_grpc_close: invalid token_id, a non-connected stream has closed");
554            }
555        }
556    }
557
558    fn on_foreign_function(&self, context_id: u32, function_id: u32, arugments_size: usize) {
559        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
560            self.active_id.set(context_id);
561            hostcalls::set_effective_context(context_id).unwrap();
562            http_stream.on_foreign_function(function_id, arugments_size)
563        } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
564            self.active_id.set(context_id);
565            hostcalls::set_effective_context(context_id).unwrap();
566            stream.on_foreign_function(function_id, arugments_size)
567        } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
568            self.active_id.set(context_id);
569            hostcalls::set_effective_context(context_id).unwrap();
570            root.on_foreign_function(function_id, arugments_size)
571        }
572    }
573}
574
575#[no_mangle]
576pub extern "C" fn proxy_on_context_create(context_id: u32, root_context_id: u32) {
577    DISPATCHER.with(|dispatcher| dispatcher.on_create_context(context_id, root_context_id))
578}
579
580#[no_mangle]
581pub extern "C" fn proxy_on_done(context_id: u32) -> bool {
582    DISPATCHER.with(|dispatcher| dispatcher.on_done(context_id))
583}
584
585#[no_mangle]
586pub extern "C" fn proxy_on_log(context_id: u32) {
587    DISPATCHER.with(|dispatcher| dispatcher.on_log(context_id))
588}
589
590#[no_mangle]
591pub extern "C" fn proxy_on_delete(context_id: u32) {
592    DISPATCHER.with(|dispatcher| dispatcher.on_delete(context_id))
593}
594
595#[no_mangle]
596pub extern "C" fn proxy_on_vm_start(context_id: u32, vm_configuration_size: usize) -> bool {
597    DISPATCHER.with(|dispatcher| dispatcher.on_vm_start(context_id, vm_configuration_size))
598}
599
600#[no_mangle]
601pub extern "C" fn proxy_on_configure(context_id: u32, plugin_configuration_size: usize) -> bool {
602    DISPATCHER.with(|dispatcher| dispatcher.on_configure(context_id, plugin_configuration_size))
603}
604
605#[no_mangle]
606pub extern "C" fn proxy_on_tick(context_id: u32) {
607    DISPATCHER.with(|dispatcher| dispatcher.on_tick(context_id))
608}
609
610#[no_mangle]
611pub extern "C" fn proxy_on_queue_ready(context_id: u32, queue_id: u32) {
612    DISPATCHER.with(|dispatcher| dispatcher.on_queue_ready(context_id, queue_id))
613}
614
615#[no_mangle]
616pub extern "C" fn proxy_on_new_connection(context_id: u32) -> Action {
617    DISPATCHER.with(|dispatcher| dispatcher.on_new_connection(context_id))
618}
619
620#[no_mangle]
621pub extern "C" fn proxy_on_downstream_data(
622    context_id: u32,
623    data_size: usize,
624    end_of_stream: bool,
625) -> Action {
626    DISPATCHER
627        .with(|dispatcher| dispatcher.on_downstream_data(context_id, data_size, end_of_stream))
628}
629
630#[no_mangle]
631pub extern "C" fn proxy_on_downstream_connection_close(context_id: u32, peer_type: PeerType) {
632    DISPATCHER.with(|dispatcher| dispatcher.on_downstream_close(context_id, peer_type))
633}
634
635#[no_mangle]
636pub extern "C" fn proxy_on_upstream_data(
637    context_id: u32,
638    data_size: usize,
639    end_of_stream: bool,
640) -> Action {
641    DISPATCHER.with(|dispatcher| dispatcher.on_upstream_data(context_id, data_size, end_of_stream))
642}
643
644#[no_mangle]
645pub extern "C" fn proxy_on_upstream_connection_close(context_id: u32, peer_type: PeerType) {
646    DISPATCHER.with(|dispatcher| dispatcher.on_upstream_close(context_id, peer_type))
647}
648
649#[no_mangle]
650pub extern "C" fn proxy_on_request_headers(
651    context_id: u32,
652    num_headers: usize,
653    end_of_stream: bool,
654) -> Action {
655    DISPATCHER.with(|dispatcher| {
656        dispatcher.on_http_request_headers(context_id, num_headers, end_of_stream)
657    })
658}
659
660#[no_mangle]
661pub extern "C" fn proxy_on_request_body(
662    context_id: u32,
663    body_size: usize,
664    end_of_stream: bool,
665) -> Action {
666    DISPATCHER
667        .with(|dispatcher| dispatcher.on_http_request_body(context_id, body_size, end_of_stream))
668}
669
670#[no_mangle]
671pub extern "C" fn proxy_on_request_trailers(context_id: u32, num_trailers: usize) -> Action {
672    DISPATCHER.with(|dispatcher| dispatcher.on_http_request_trailers(context_id, num_trailers))
673}
674
675#[no_mangle]
676pub extern "C" fn proxy_on_response_headers(
677    context_id: u32,
678    num_headers: usize,
679    end_of_stream: bool,
680) -> Action {
681    DISPATCHER.with(|dispatcher| {
682        dispatcher.on_http_response_headers(context_id, num_headers, end_of_stream)
683    })
684}
685
686#[no_mangle]
687pub extern "C" fn proxy_on_response_body(
688    context_id: u32,
689    body_size: usize,
690    end_of_stream: bool,
691) -> Action {
692    DISPATCHER
693        .with(|dispatcher| dispatcher.on_http_response_body(context_id, body_size, end_of_stream))
694}
695
696#[no_mangle]
697pub extern "C" fn proxy_on_response_trailers(context_id: u32, num_trailers: usize) -> Action {
698    DISPATCHER.with(|dispatcher| dispatcher.on_http_response_trailers(context_id, num_trailers))
699}
700
701#[no_mangle]
702pub extern "C" fn proxy_on_http_call_response(
703    _context_id: u32,
704    token_id: u32,
705    num_headers: usize,
706    body_size: usize,
707    num_trailers: usize,
708) {
709    DISPATCHER.with(|dispatcher| {
710        dispatcher.on_http_call_response(token_id, num_headers, body_size, num_trailers)
711    })
712}
713
714#[no_mangle]
715pub extern "C" fn proxy_on_grpc_receive_initial_metadata(
716    _context_id: u32,
717    token_id: u32,
718    headers: u32,
719) {
720    DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive_initial_metadata(token_id, headers))
721}
722
723#[no_mangle]
724pub extern "C" fn proxy_on_grpc_receive(_context_id: u32, token_id: u32, response_size: usize) {
725    DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive(token_id, response_size))
726}
727
728#[no_mangle]
729pub extern "C" fn proxy_on_grpc_receive_trailing_metadata(
730    _context_id: u32,
731    token_id: u32,
732    trailers: u32,
733) {
734    DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive_trailing_metadata(token_id, trailers))
735}
736
737#[no_mangle]
738pub extern "C" fn proxy_on_grpc_close(_context_id: u32, token_id: u32, status_code: u32) {
739    DISPATCHER.with(|dispatcher| dispatcher.on_grpc_close(token_id, status_code))
740}
741
742#[no_mangle]
743pub extern "C" fn proxy_on_foreign_function(
744    context_id: u32,
745    function_id: u32,
746    arguments_size: usize,
747) {
748    DISPATCHER
749        .with(|dispatcher| dispatcher.on_foreign_function(context_id, function_id, arguments_size))
750}