1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
/// [JetStream](https://docs.nats.io/nats-concepts/jetstream) support.
/// [Key/Value Store](https://docs.nats.io/nats-concepts/jetstream/key-value-store) support.
/// Define the message struct using inside `ame-bus`.
/// Tokio concurrency utilities.
/// Service RPC support. Using just NATS core features.
pub use ;
/// # Specify the JetStream using of the struct.
/// Usually used for a consumer or message in JetStream.
///
/// ## Example:
///
/// ```rust
/// # use ame_bus_macros::jetstream;
///
/// #[jetstream(
/// name = "user",
/// description = "User successful registered event",
/// )]
/// pub struct UserSuccessfulRegistered {
/// pub user_id: String,
/// pub email: String,
/// }
/// ```
///
/// ## Supported attributes
///
/// - `name="foo"` (required): Stream name. Must not have spaces, tabs or period . characters.
/// - `description="foo"`: Stream description.
/// - `max_messages=100_000`: Maximum number of messages to keep in the stream.
/// - `max_bytes=1024`: Maximum bytes of messages
/// - `no_ack`: Disables acknowledging messages that are received by the Stream.
///
/// Attention that these options only work if the stream is not created yet and the stream
/// is created with these options.
pub use jetstream;
/// # Configure the JetStream consumer.
///
/// Must implement [NatsJetStreamMeta](crate::jetstream::NatsJetStreamMeta) trait first.
///
/// ## Example
///
/// ```rust
/// # use ame_bus_macros::{jetstream, jetstream_consumer};
///
/// #[jetstream(
/// name = "user",
/// description = "User successful registered event consumer",
/// )]
/// #[jetstream_consumer(
/// name = "user-successful-registered-consumer",
/// durable,
/// filter_subject = "user.registered",
/// )]
/// pub struct UserSuccessfulRegisteredConsumer {
/// database_connection: (), // use `()` for example, should be a real connection
/// }
/// ```
///
/// ## Supported attributes
///
/// - `name="foo"`: Consumer name. If the consumer is durable, it will also be the durable name. Default to the struct name in snake case.
/// - `push`: Use push-based consumer. Cannot be used with `pull`.
/// - `pull`: Use pull-based consumer. Cannot be used with `push`. Default if neither `push` nor `pull` is specified.
/// - `durable`: Create a durable consumer. Default is false.
/// - `durable_name="foo"`: Durable name. If not specified, the consumer name is used.
/// - `deliver_policy="all"|"last"|"new"|"last_per_subject"`: Delivery policy. Default is `all`.
/// - `ack_policy="explicit"|"all"|"none"`: Acknowledgement policy. Default is `explicit`.
/// - `ack_wait_secs=10`: Acknowledgement wait time in seconds.
/// - `filter_subject="foo.bar"`: Filter messages by subject. Allow wildcards.
/// - `headers_only`: Let payload be empty and only headers are delivered.
///
/// only with pull consumer:
/// - `max_batch=10`: Maximum number of messages to fetch in a batch.
///
/// only with push consumer:
/// - `deliver_subject="foo.bar"`: Subject to deliver messages to.
/// - `deliver_group="foo"`: Consumer group to deliver messages to.
///
/// ## Example:
///
/// ```rust
/// # use ame_bus_macros::*;
///
/// #[jetstream(name = "mail", description = "Mail service")]
/// #[jetstream_consumer(pull, durable, filter_subject="mail.send")]
/// struct EmailSendEventConsumer {
/// smtp_connection: (),
/// email_template: String,
/// }
/// ```
pub use jetstream_consumer;
/// Implement `NatsJsonMessage` trait if it has already implemented `Serialize` and `Deserialize` traits.
///
/// example:
/// ```rust
/// # use serde::{Deserialize, Serialize};
/// # use ame_bus_macros::NatsJsonMessage;
///
/// #[derive(Serialize, Deserialize, NatsJsonMessage)]
/// pub struct User {
/// pub id: String,
/// pub name: String,
/// }
/// ```
pub use NatsJsonMessage;
/// # RPC Service
///
/// Mark a struct as an RPC service.
///
/// ## Example
/// ```rust
/// # use ame_bus_macros::rpc_service;
/// #[rpc_service(
/// name = "user.info",
/// version = "0.1.0",
/// )]
/// pub struct UserInfoService {
/// // fields, like database connection
/// }
/// ```
///
/// ## Supported attributes
///
/// - `name="foo"` (required): Service name. Can be a NATS path.
/// - `description="foo"`: Service description.
/// - `version="0.1.0"`: Service version. Default is `0.1.0`.
/// - `queue_group="foo"`: Queue group name.
///
/// Usually, you need to set the `queue_group` to make the service scaled properly.
pub use rpc_service;
/// # RPC Route Register
///
/// Register the route, and implement the [PooledApp](pool::PooledApp) trait.
///
/// ## Usage
///
/// Use an enum as route table, mark the enum with `#[rpc_route()]` attribute.
///
/// `#[rpc_route()]` must have these args:
///
/// - `service`: The service struct name.
/// - `nats_connection`: The NATS connection, should be `&async_nats::Client`.
///
/// *To avoid lifetime issue, use `&'static async_nats::Client` with `OnceCell<Client>` is suggested.*
///
/// To register the requests, each variant in enum must have `#[rpc_endpoint()]` attribute.
///
/// `#[rpc_endpoint(request = "RequestName")]` must have these args:
///
/// - `request`: the request, must implement [NatsRpcRequest](crate::service_rpc::NatsRpcRequest) trait.
///
/// ## Example
///
/// ```rust
/// # use ame_bus_macros::*;
/// # use ame_bus::service_rpc::NatsRpcRequestMeta;
/// use tokio::sync::OnceCell;
/// use serde::{Deserialize, Serialize};
/// use ame_bus::service_rpc::NatsRpcRequest;
///
/// // don't forget to set up the NATS connection
/// static NATS_CONNECTION: OnceCell<async_nats::Client> = OnceCell::const_new();
///
/// #[rpc_service(
/// name = "user.info",
/// version = "0.1.0",
/// )]
/// pub struct UserInfoService {
/// // fields, like database connection
/// }
///
/// #[derive(Debug, Clone, Serialize, Deserialize, NatsJsonMessage)]
/// struct UserAvatarReq {
/// user_id: String,
/// }
///
/// # impl NatsRpcRequestMeta for UserAvatarReq {
/// # const ENDPOINT_NAME: &'static str = "avatar";
/// # type Service = UserInfoService;
/// # }
///
/// #[async_trait::async_trait]
/// impl NatsRpcRequest for UserAvatarReq {
/// type Response = ();
///
/// async fn process_request(service_state: &Self::Service, request: Self) -> anyhow::Result<Self::Response> {
/// Ok(())
/// }
/// }
///
/// #[derive(Debug, Clone, Serialize, Deserialize, NatsJsonMessage)]
/// struct UserMetaReq {
/// user_id: String,
/// }
///
/// # impl NatsRpcRequestMeta for UserMetaReq {
/// # const ENDPOINT_NAME: &'static str = "meta";
/// # type Service = UserInfoService;
/// # }
///
/// #[async_trait::async_trait]
/// impl NatsRpcRequest for UserMetaReq {
/// type Response = ();
/// async fn process_request(service_state: &Self::Service, request: Self) -> anyhow::Result<Self::Response> {
/// Ok(())
/// }
/// }
///
/// #[rpc_route(service="UserInfoService", nats_connection="NATS_CONNECTION.get().unwrap()")]
/// enum UserInfoRoute {
/// #[rpc_endpoint(request="UserAvatarReq")]
/// UserAvatar,
/// #[rpc_endpoint(request="UserMetaReq")]
/// UserMeta,
/// }
/// ```
pub use rpc_route;
pub use tracing;
pub use futures;