1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
// Copyright 2020 Kodebox, Inc.
// This file is part of CodeChain.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.

use crate::packet::{PacketView, SlotType};
use crate::port::{client::Client, server::Server, BasicPort, Port};
use crate::transport::multiplex::{self, ForwardResult, MultiplexResult, Multiplexer};
use crate::transport::{TransportRecv, TransportSend};
use crate::{raw_exchange::*, Service, ServiceToExport, ServiceToImport};
use parking_lot::Mutex;
use std::sync::{Arc, Barrier, Weak};
use threadpool::ThreadPool;

mod meta_service {
    use super::*;
    /// This is required because of macro
    use crate as remote_trait_object;

    #[remote_trait_object_macro::service]
    pub trait MetaService: Service {
        fn firm_close(&self);
    }

    pub struct MetaServiceImpl {
        barrier: Arc<Barrier>,
    }

    impl MetaServiceImpl {
        pub fn new(barrier: Arc<Barrier>) -> Self {
            Self {
                barrier,
            }
        }
    }

    impl Service for MetaServiceImpl {}

    impl MetaService for MetaServiceImpl {
        fn firm_close(&self) {
            self.barrier.wait();
        }
    }
}
use meta_service::{MetaService, MetaServiceImpl};

/// A configuration of a `remote-trait-object` context.
#[derive(Clone, Debug)]
pub struct Config {
    /// A name that will be appended to the names of various threads spawned by `remote-trait-object`, for an easy debug.
    ///
    /// This can be helpful if you handle multiple contexts of `remote-trait-object`.
    pub name: String,

    /// Number of the maximum of concurrent calls.
    ///
    /// Value of this doesn't have anything to do with the number of threads that would be spawned.
    /// Having a large number of this wouldn't charge any cost except really small additional memory allocation.
    ///
    /// [`server_threads`]: ./struct.Config.html#field.server_threads
    pub call_slots: usize,

    /// A timeout for a remote method call.
    ///
    /// All remote method invocations through your proxy object and delete requests (that happens when you drop a proxy object)
    /// will have this timeout. If it exceeds, it will cause an error.
    ///
    /// Use `None` for to wait indefinitely.
    pub call_timeout: Option<std::time::Duration>,

    /// A maximum number of services that this context can export.
    pub maximum_services_num: usize,

    /// A shared instance of a thread pool that will be used in call handling
    ///
    /// A `remote-trait-object` context will use this thread pool to handle an incoming method call.
    /// Size of this pool determines the maximum number of concurrent calls that the context can handle.
    /// Note that this pool is wrapped in `Arc`, which means that it can be possibly shared with other places.
    pub thread_pool: Arc<Mutex<threadpool::ThreadPool>>,
}

impl Config {
    pub fn default_setup() -> Self {
        Self {
            name: "my rto".to_owned(),
            call_slots: 512,
            maximum_services_num: 65536,
            call_timeout: Some(std::time::Duration::from_millis(1000)),

            thread_pool: Arc::new(Mutex::new(ThreadPool::new(8))),
        }
    }
}

/// One end of a `remote-trait-object` connection.
///
/// If you establish a remote-trait-object connection,
/// there must be two ends and each will be provided as a `Context` to each user on both sides.
///
/// A context holds multiple things to function as a `remote-trait-object` connection end.
/// Since the connection is symmetric, it manages both _server_ and _client_ toward the other end.
/// It also manages a _registry_ that contains all exported services.
/// The server will look up the registry to find a target object for handling an incoming method invocation.
///
/// Note that `remote-trait-object` is a point-to-point connection protocol.
/// Exporting & importing a service are always performed on a specific connection,
/// which is toward the other side, or another instance of `Context`.
///
/// If you created an instance of this, that means you have a connection that has been successfully established **once**,
/// but is not guaranteed to be alive.
/// If the other end (or the other `Context`) is closed, most operations performed on `Context` will just cause an error.
pub struct Context {
    config: Config,
    multiplexer: Option<Multiplexer>,
    server: Option<Server>,
    port: Option<Arc<BasicPort>>,
    meta_service: Option<Box<dyn MetaService>>,
    firm_close_barrier: Arc<Barrier>,
}

impl std::fmt::Debug for Context {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Context").field("config", &self.config).finish()
    }
}

impl Context {
    /// Creates a new context without any initial services.
    ///
    /// If you decide to use this, you have to exchange raw [`HandleToExchange`] at least once using a secondary transportation means.
    /// It is really rarely needed, so please consider introducing an initializing service as an initial service, to avoid any raw exchange.
    ///
    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
    ///
    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
    pub fn new<S: TransportSend + 'static, R: TransportRecv + 'static>(
        config: Config,
        transport_send: S,
        transport_recv: R,
    ) -> Self {
        let null_to_export = crate::service::create_null_service();
        let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
            Self::with_initial_service(config, transport_send, transport_recv, ServiceToExport::new(null_to_export));
        ctx
    }

    /// Creates a new context only exporting a service, but importing nothing.
    ///
    /// The other end's context must be initialized with `with_initial_service_import()`.
    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
    ///
    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
    pub fn with_initial_service_export<S: TransportSend + 'static, R: TransportRecv + 'static, A: ?Sized + Service>(
        config: Config,
        transport_send: S,
        transport_recv: R,
        initial_service: ServiceToExport<A>,
    ) -> Self {
        let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
            Self::with_initial_service(config, transport_send, transport_recv, initial_service);
        ctx
    }

    /// Creates a new context only importing a service, but exporting nothing.
    ///
    /// The other end's context must be initialized with `with_initial_service_export()`.
    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
    ///
    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
    pub fn with_initial_service_import<S: TransportSend + 'static, R: TransportRecv + 'static, B: ?Sized + Service>(
        config: Config,
        transport_send: S,
        transport_recv: R,
    ) -> (Self, ServiceToImport<B>) {
        let null_to_export = crate::service::create_null_service();
        let (ctx, import) =
            Self::with_initial_service(config, transport_send, transport_recv, ServiceToExport::new(null_to_export));
        (ctx, import)
    }

    /// Creates a new context exchanging two services, one for export and one for import.
    ///
    /// It takes `initial_service` and registers in it, and passes a `HandleToExchange` internally. (_export_).
    /// Also it receives a `HandleToExchange` from the other side, and makes it into a proxy object. (_import_)
    ///
    /// The other end's context must be initialized with `with_initial_service()` as well, and
    /// such processes will be symmetric for both.
    ///
    /// [`HandleToExchange`]: ../raw_exchange/struct.HandleToExchange.html
    pub fn with_initial_service<
        S: TransportSend + 'static,
        R: TransportRecv + 'static,
        A: ?Sized + Service,
        B: ?Sized + Service,
    >(
        config: Config,
        transport_send: S,
        transport_recv: R,
        initial_service: ServiceToExport<A>,
    ) -> (Self, ServiceToImport<B>) {
        let firm_close_barrier = Arc::new(Barrier::new(2));

        let MultiplexResult {
            multiplexer,
            request_recv,
            response_recv,
        } = Multiplexer::multiplex::<R, PacketForward>(config.clone(), transport_recv);
        let transport_send = Arc::new(transport_send) as Arc<dyn TransportSend>;

        let client = Client::new(config.clone(), Arc::clone(&transport_send), Box::new(response_recv));
        let port = BasicPort::new(
            config.clone(),
            client,
            (Box::new(MetaServiceImpl::new(Arc::clone(&firm_close_barrier))) as Box<dyn MetaService>).into_skeleton(),
            initial_service.get_raw_export(),
        );
        let server = Server::new(config.clone(), port.get_registry(), transport_send, Box::new(request_recv));

        let port_weak = Arc::downgrade(&port) as Weak<dyn Port>;
        let meta_service = <Box<dyn MetaService> as ImportProxy<dyn MetaService>>::import_proxy(
            Weak::clone(&port_weak),
            HandleToExchange(crate::forwarder::META_SERVICE_OBJECT_ID),
        );
        let initial_handle = HandleToExchange(crate::forwarder::INITIAL_SERVICE_OBJECT_ID);

        let ctx = Context {
            config,
            multiplexer: Some(multiplexer),
            server: Some(server),
            port: Some(port),
            meta_service: Some(meta_service),
            firm_close_barrier,
        };
        let initial_service = ServiceToImport::from_raw_import(initial_handle, port_weak);
        (ctx, initial_service)
    }

    pub(crate) fn get_port(&self) -> Weak<dyn Port> {
        Arc::downgrade(&self.port.clone().expect("It becomes None only when the context is dropped.")) as Weak<dyn Port>
    }

    /// Clears all service objects in its registry.
    ///
    /// The most usual way of deleting a service object is dropping its proxy object on the client side, and letting it request a delete to the exporter side.
    /// However, in some cases (especially while you're trying to shut down the connection) it is useful to clear all exported service objects
    /// **by the exporter side itself**.
    ///
    /// Note that it will cause an error if the client side drops a proxy object of an already deleted (by this method) service object.
    /// Consider calling [`disable_garbage_collection()`] on the other end if there's such an issue.
    ///
    /// Note also that this might trigger _delete request_ as a side effect since the service object might own a proxy object.
    ///
    /// [`disable_garbage_collection()`]: ./struct.Context.html#method.disable_garbage_collection
    pub fn clear_service_registry(&mut self) {
        self.port.as_mut().unwrap().clear_registry();
    }

    /// Disables all _delete request_ from this end to the other end.
    ///
    /// If you call this, all `drop()` of proxy objects imported from this context won't send a delete request anymore.
    /// This is useful when you're not sure if the connection is still alive, but you have to close your side's context anyway.
    pub fn disable_garbage_collection(&self) {
        self.port.as_ref().expect("It becomes None only when the context is dropped.").set_no_drop();
    }

    /// Closes a context with a firm synchronization with the other end.
    ///
    /// If you call this method, it will block until the other end calls `firm_close()` too.
    /// This is useful when you want to assure that two ends never suffer from 'other end has been closed' error.
    /// If one of the contexts dropped too early, all remote calls (including delete request) from the other end will fail.
    /// To avoid such a situation, consider using this to stay alive as long as it is required.
    ///
    /// FIXME: currently it doesn't use `timeout` and blocks indefinitely.
    pub fn firm_close(self, _timeout: Option<std::time::Duration>) -> Result<(), Self> {
        let barrier = Arc::clone(&self.firm_close_barrier);
        let t = std::thread::spawn(move || {
            barrier.wait();
        });
        self.meta_service.as_ref().unwrap().firm_close();
        t.join().unwrap();

        Ok(())
    }
}

impl Drop for Context {
    /// This will delete all service objects after calling `disable_garbage_collection()` internally.
    fn drop(&mut self) {
        // We have to clean all registered service, as some might hold another proxy object inside, which refers this context's port.
        // For such case, we have to make them be dropped first before we unwrap the Arc<BasicPort>
        self.port.as_ref().unwrap().set_no_drop();
        self.port.as_ref().unwrap().clear_registry();
        drop(self.meta_service.take().unwrap());

        self.multiplexer.take().expect("It becomes None only when the context is dropped.").shutdown();
        // Shutdown server after multiplexer
        self.server.take().expect("It becomes None only when the context is dropped.").shutdown();
        // Shutdown port after multiplexer
        Arc::try_unwrap(self.port.take().expect("It becomes None only when the context is dropped."))
            .unwrap()
            .shutdown();
    }
}

pub struct PacketForward;

impl multiplex::Forward for PacketForward {
    fn forward(packet: PacketView) -> ForwardResult {
        match packet.slot().get_type() {
            SlotType::Request => ForwardResult::Request,
            SlotType::Response => ForwardResult::Response,
        }
    }
}