1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use core::future::Future;
use std::{collections::HashMap, pin::Pin, sync::Arc};

use crate::stream_protocol::Generator;

/// General type returned by every procedure
pub type Response<T> = Pin<Box<dyn Future<Output = T> + Send>>;

/// Payload for procedures which don't receive a stream
pub type CommonPayload = Vec<u8>;

/// Response type returned by a unary procedure
pub type UnaryResponse = Response<Vec<u8>>;

/// Handler type for a unary procedure.
pub type UnaryRequestHandler<Context> =
    dyn Fn(CommonPayload, Arc<Context>) -> UnaryResponse + Send + Sync;

/// Response type returned by a server streams procedure
pub type ServerStreamsResponse = Response<Generator<Vec<u8>>>;

/// Handler type for a server streams procedure
pub type ServerStreamsRequestHandler<Context> =
    dyn Fn(CommonPayload, Arc<Context>) -> ServerStreamsResponse + Send + Sync;

/// Payload type that a client streams procedure receives
pub type ClientStreamsPayload = Generator<Vec<u8>>;

/// Response type returned by a client streams procedure
pub type ClientStreamsResponse = Response<Vec<u8>>;

/// Handler type for a client streams procedure
pub type ClientStreamsRequestHandler<Context> =
    dyn Fn(ClientStreamsPayload, Arc<Context>) -> ClientStreamsResponse + Send + Sync;

/// Payload type that a bidirection streams procedure rceives
pub type BiStreamsPayload = Generator<Vec<u8>>;

/// Response type returned by a bidirectional streams procedure
pub type BiStreamsResponse = Response<Generator<Vec<u8>>>;

/// Handler type for a bidirectional streams procedure
pub type BiStreamsRequestHandler<Context> =
    dyn Fn(BiStreamsPayload, Arc<Context>) -> BiStreamsResponse + Send + Sync;

/// Type used for storing procedure definitions given by the codegeneration for the RPC service
pub enum Definition<Context> {
    /// Store unary procedure definitions
    Unary(Arc<UnaryRequestHandler<Context>>),
    /// Store server streams procedure definitions
    ServerStreams(Arc<ServerStreamsRequestHandler<Context>>),
    /// Store client strems procedure definitions
    ClientStreams(Arc<ClientStreamsRequestHandler<Context>>),
    /// Store bidirectional streams procedure definitions
    BiStreams(Arc<BiStreamsRequestHandler<Context>>),
}

/// It stores all procedures defined for a RPC service
pub struct ServiceModuleDefinition<Context> {
    /// Map that stores all procedures for the service
    ///
    /// the key is the procedure's name and the value is the handler for the procedure
    ///
    definitions: HashMap<String, Definition<Context>>,
}

impl<Context> ServiceModuleDefinition<Context> {
    pub fn new() -> Self {
        Self {
            definitions: HashMap::new(),
        }
    }

    /// Add an unary procedure handler to the service definition
    pub fn add_unary<
        H: Fn(CommonPayload, Arc<Context>) -> UnaryResponse + Send + Sync + 'static,
    >(
        &mut self,
        name: &str,
        handler: H,
    ) {
        self.add_definition(name, Definition::Unary(Arc::new(handler)));
    }

    /// Add a server streams procedure handler to the service definition
    pub fn add_server_streams<
        H: Fn(CommonPayload, Arc<Context>) -> ServerStreamsResponse + Send + Sync + 'static,
    >(
        &mut self,
        name: &str,
        handler: H,
    ) {
        self.add_definition(name, Definition::ServerStreams(Arc::new(handler)));
    }

    /// Add a client streams procedure handler to the service definition
    pub fn add_client_streams<
        H: Fn(ClientStreamsPayload, Arc<Context>) -> ClientStreamsResponse + Send + Sync + 'static,
    >(
        &mut self,
        name: &str,
        handler: H,
    ) {
        self.add_definition(name, Definition::ClientStreams(Arc::new(handler)));
    }

    /// Add a bidirectional streams procedure handler to the service definition
    pub fn add_bidir_streams<
        H: Fn(BiStreamsPayload, Arc<Context>) -> BiStreamsResponse + Send + Sync + 'static,
    >(
        &mut self,
        name: &str,
        handler: H,
    ) {
        self.add_definition(name, Definition::BiStreams(Arc::new(handler)));
    }

    fn add_definition(&mut self, name: &str, definition: Definition<Context>) {
        self.definitions.insert(name.to_string(), definition);
    }

    pub fn get_definitions(&self) -> &HashMap<String, Definition<Context>> {
        &self.definitions
    }
}

impl<Context> Default for ServiceModuleDefinition<Context> {
    fn default() -> Self {
        Self::new()
    }
}