1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#[cfg(test)]
mod test;

use crate::core::message::CallFunctionResult;
use crate::core::{Serialize, SerializedValue, ServiceId};
use crate::{Error, Handle};
use futures_channel::mpsc::UnboundedReceiver;
use futures_core::stream::{FusedStream, Stream};
use std::future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Owned service on the bus.
///
/// [`Service`s](Service) are associated with an [`Object`](crate::Object) and created with
/// [`Object::create_service`](crate::Object::create_service). [`Service`s](Service) can be
/// destroyed again by calling [`destroy`](Service::destroy), by dropping them, or implicitly when
/// the [`Object`](crate::Object) is destroyed.
///
/// [`Service`] exposes an asynchronous stream of incoming [`FunctionCall`s](FunctionCall) via its
/// implementation of the [`Stream`] trait.
///
/// Events can be emitted directly with [`Handle::emit_event`]. This is available on [`Handle`],
/// because usually [`Service`] is borrowed mutably to wait for function calls. The [`ServiceId`]
/// required for [`Handle::emit_event`] can be obtained with e.g. [`Service::id`].
///
/// # Examples
///
/// Creating and destroying [`Service`s](Service):
///
/// ```
/// use aldrin::Error;
/// use aldrin::core::{ObjectUuid, ServiceUuid};
/// use std::mem;
/// use uuid::uuid;
///
/// const SERVICE_UUID: ServiceUuid = ServiceUuid(uuid!("f88f1706-9609-42a4-8796-4e7bb8c3ef24"));
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let broker = aldrin_test::tokio::TestBroker::new();
/// # let handle = broker.add_client().await;
/// let object = handle.create_object(ObjectUuid::new_v4()).await?;
///
/// // Create a service and destroy it again explicitly:
/// let service = object.create_service(SERVICE_UUID, 1).await?;
/// service.destroy().await?;
///
/// // Destroy a service implicitly by dropping it:
/// let service = object.create_service(SERVICE_UUID, 1).await?;
/// mem::drop(service);
///
/// // Destroy a service implicitly by dropping the object:
/// let service = object.create_service(SERVICE_UUID, 1).await?;
/// let service_id = service.id();
/// mem::drop(object);
/// assert_eq!(service.destroy().await, Err(Error::InvalidService));
/// # Ok(())
/// # }
/// ```
///
/// The following is a small chat server example, which shows how to handle function call on a
/// service and how to emit events.
///
/// ```
/// use aldrin::core::{ObjectUuid, ServiceUuid};
/// use std::collections::HashSet;
/// use uuid::uuid;
///
/// const CHAT_UUID: ServiceUuid = ServiceUuid(uuid!("91334d42-7045-4292-99dc-9fd89c5f104f"));
///
/// // Functions
/// const SHUTDOWN: u32 = 1;
/// const JOIN_CHAT: u32 = 2;
/// const LEAVE_CHAT: u32 = 3;
/// const LIST_USERS: u32 = 4;
/// const SEND_MESSAGE: u32 = 5;
///
/// // Events
/// const JOINED_CHAT: u32 = 1;
/// const LEFT_CHAT: u32 = 2;
/// const MESSAGE_SENT: u32 = 3;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let broker = aldrin_test::tokio::TestBroker::new();
/// # let handle = broker.add_client().await;
/// // Create object and our chat service:
/// let object = handle.create_object(ObjectUuid::new_v4()).await?;
/// let mut service = object.create_service(CHAT_UUID, 1).await?;
/// let service_id = service.id();
///
/// // HashSet to keep track of users:
/// let mut users = HashSet::new();
///
/// # handle.call_function::<_, (), ()>(service.id(), SHUTDOWN, &())?;
/// // Iterate (asynchronously) over incoming function calls. `func` is of type `FunctionCall`,
/// // which contains the function's id, the arguments, and a reply object.
/// while let Some(func) = service.next_function_call().await {
///     match func.id {
///         SHUTDOWN => break,
///
///         JOIN_CHAT => {
///             let name: String = func.args.deserialize()?;
///             if users.insert(name.clone()) {
///                 // Emit an event that a new user with the given name joined:
///                 handle.emit_event(service_id, JOINED_CHAT, &name)?;
///
///                 func.reply.ok(&())?;
///             } else {
///                 // Signal that the name is already taken.
///                 func.reply.err(&())?;
///             }
///         }
///
///         LEAVE_CHAT => {
///             let name: String = func.args.deserialize()?;
///             if users.remove(&name) {
///                 // Emit an event that a user with the given name left:
///                 handle.emit_event(service_id, LEFT_CHAT, &name)?;
///
///                 func.reply.ok(&())?;
///             } else {
///                 // Signal that the name is not known.
///                 func.reply.err(&())?;
///             }
///         }
///
///         LIST_USERS => func.reply.ok(&users)?,
///
///         SEND_MESSAGE => {
///             // Broadcast the message:
///             let message = func.args.deserialize()?;
///             handle.emit_event(service_id, MESSAGE_SENT, &message)?;
///             func.reply.ok(&())?;
///         }
///
///         _ => {}
///     }
/// }
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct Service {
    id: ServiceId,
    version: u32,
    client: Handle,
    function_calls: UnboundedReceiver<RawFunctionCall>,
}

impl Service {
    pub(crate) fn new(
        id: ServiceId,
        version: u32,
        client: Handle,
        function_calls: UnboundedReceiver<RawFunctionCall>,
    ) -> Self {
        Service {
            id,
            version,
            client,
            function_calls,
        }
    }

    /// Returns the id of the service.
    pub fn id(&self) -> ServiceId {
        self.id
    }

    /// Returns the version of the service.
    pub fn version(&self) -> u32 {
        self.version
    }

    /// Returns a handle to the client that was used to create the service.
    pub fn handle(&self) -> &Handle {
        &self.client
    }

    /// Destroys the service.
    ///
    /// If the [`Service`] has already been destroyed, then [`Error::InvalidService`] is returned.
    pub async fn destroy(&self) -> Result<(), Error> {
        self.client.destroy_service(self.id).await
    }

    /// Polls for the next function call.
    pub fn poll_next_function_call(&mut self, cx: &mut Context) -> Poll<Option<FunctionCall>> {
        Pin::new(&mut self.function_calls).poll_next(cx).map(|r| {
            r.map(|req| FunctionCall::new(req.function, req.value, self.client.clone(), req.serial))
        })
    }

    /// Returns the next function call.
    pub async fn next_function_call(&mut self) -> Option<FunctionCall> {
        future::poll_fn(|cx| self.poll_next_function_call(cx)).await
    }
}

impl Drop for Service {
    fn drop(&mut self) {
        self.client.destroy_service_now(self.id);
    }
}

impl Stream for Service {
    type Item = FunctionCall;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<FunctionCall>> {
        self.poll_next_function_call(cx)
    }
}

impl FusedStream for Service {
    fn is_terminated(&self) -> bool {
        self.function_calls.is_terminated()
    }
}

/// Function call received by a service.
///
/// [`FunctionCall`s](FunctionCall) can be received with the [`Stream`] implementation of
/// [`Service`].
///
/// See [`Service`] for usage examples.
#[derive(Debug)]
pub struct FunctionCall {
    /// Id of the called function.
    pub id: u32,

    /// Arguments passed to called function.
    pub args: SerializedValue,

    /// Reply object, used to set the return value of the function call.
    pub reply: FunctionCallReply,
}

impl FunctionCall {
    pub(crate) fn new(id: u32, args: SerializedValue, client: Handle, serial: u32) -> Self {
        FunctionCall {
            id,
            args,
            reply: FunctionCallReply::new(client, serial),
        }
    }
}

/// Helper used to reply to a pending service function call.
///
/// Every [`FunctionCall`] contains a [`FunctionCallReply`]. It can be used once to set the return
/// value of the function call.
///
/// When [`FunctionCallReply`] is dropped (as opposed to consumed by one of its methods),
/// [`abort`](FunctionCallReply::abort) will be called implicitly.
#[derive(Debug)]
pub struct FunctionCallReply {
    client: Option<Handle>,
    serial: u32,
}

impl FunctionCallReply {
    pub(crate) fn new(client: Handle, serial: u32) -> Self {
        FunctionCallReply {
            client: Some(client),
            serial,
        }
    }

    /// Sets the function call's reply.
    pub fn set<T, E>(self, res: Result<&T, &E>) -> Result<(), Error>
    where
        T: Serialize + ?Sized,
        E: Serialize + ?Sized,
    {
        match res {
            Ok(value) => self.ok(value),
            Err(value) => self.err(value),
        }
    }

    /// Signals that the function call was successful.
    pub fn ok<T: Serialize + ?Sized>(mut self, value: &T) -> Result<(), Error> {
        let res = CallFunctionResult::ok_with_serialize_value(value)?;
        self.client
            .take()
            .unwrap()
            .function_call_reply(self.serial, res)
    }

    /// Signals that the function call has failed.
    pub fn err<T: Serialize + ?Sized>(mut self, value: &T) -> Result<(), Error> {
        let res = CallFunctionResult::err_with_serialize_value(value)?;
        self.client
            .take()
            .unwrap()
            .function_call_reply(self.serial, res)
    }

    /// Aborts the function call.
    ///
    /// The caller will be still be notified that the call was aborted.
    pub fn abort(mut self) -> Result<(), Error> {
        self.client
            .take()
            .unwrap()
            .function_call_reply(self.serial, CallFunctionResult::Aborted)
    }

    /// Signals that an invalid function has been called.
    ///
    /// The function, as identified by [`FunctionCall::id`], may be invalid or unexpected by the
    /// service.
    pub fn invalid_function(mut self) -> Result<(), Error> {
        self.client
            .take()
            .unwrap()
            .function_call_reply(self.serial, CallFunctionResult::InvalidFunction)
    }

    /// Signals that invalid arguments were passed to the function.
    pub fn invalid_args(mut self) -> Result<(), Error> {
        self.client
            .take()
            .unwrap()
            .function_call_reply(self.serial, CallFunctionResult::InvalidArgs)
    }
}

impl Drop for FunctionCallReply {
    fn drop(&mut self) {
        if let Some(client) = self.client.take() {
            client
                .function_call_reply(self.serial, CallFunctionResult::Aborted)
                .ok();
        }
    }
}

#[derive(Debug)]
pub(crate) struct RawFunctionCall {
    pub serial: u32,
    pub function: u32,
    pub value: SerializedValue,
}