1use {
5 crate::domain::{DidKey, MessageId, SubscriptionId, Topic},
6 serde::{de::DeserializeOwned, Deserialize, Serialize},
7 std::{fmt::Debug, sync::Arc},
8};
9pub use {error::*, watch::*};
10
11pub mod error;
12pub mod msg_id;
13#[cfg(test)]
14mod tests;
15pub mod watch;
16
17pub const JSON_RPC_VERSION_STR: &str = "2.0";
19
20pub static JSON_RPC_VERSION: once_cell::sync::Lazy<Arc<str>> =
21 once_cell::sync::Lazy::new(|| Arc::from(JSON_RPC_VERSION_STR));
22
23pub const MAX_SUBSCRIPTION_BATCH_SIZE: usize = 500;
27
28pub const MAX_FETCH_BATCH_SIZE: usize = 500;
32
33pub const MAX_RECEIVE_BATCH_SIZE: usize = 500;
37
38pub trait Serializable:
39 Debug + Clone + PartialEq + Eq + Serialize + DeserializeOwned + Send + Sync + 'static
40{
41}
42impl<T> Serializable for T where
43 T: Debug + Clone + PartialEq + Eq + Serialize + DeserializeOwned + Send + Sync + 'static
44{
45}
46
47pub trait ServiceRequest: Serializable {
50 type Error: ServiceError;
52
53 type Response: Serializable;
55
56 fn validate(&self) -> Result<(), PayloadError> {
58 Ok(())
59 }
60
61 fn into_params(self) -> Params;
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(untagged)]
67pub enum Payload {
68 Request(Request),
70
71 Response(Response),
73}
74
75impl Payload {
76 pub fn id(&self) -> MessageId {
78 match self {
79 Self::Request(req) => req.id,
80 Self::Response(Response::Success(r)) => r.id,
81 Self::Response(Response::Error(r)) => r.id,
82 }
83 }
84
85 pub fn validate(&self) -> Result<(), PayloadError> {
86 match self {
87 Self::Request(request) => request.validate(),
88 Self::Response(response) => response.validate(),
89 }
90 }
91}
92
93impl<T> From<T> for Payload
94where
95 T: Into<ErrorResponse>,
96{
97 fn from(value: T) -> Self {
98 Self::Response(Response::Error(value.into()))
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104#[serde(untagged)]
105pub enum Response {
106 Success(SuccessfulResponse),
108
109 Error(ErrorResponse),
111}
112
113impl Response {
114 pub fn id(&self) -> MessageId {
115 match self {
116 Self::Success(response) => response.id,
117 Self::Error(response) => response.id,
118 }
119 }
120
121 pub fn validate(&self) -> Result<(), PayloadError> {
123 match self {
124 Self::Success(response) => response.validate(),
125 Self::Error(response) => response.validate(),
126 }
127 }
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
132pub struct SuccessfulResponse {
133 pub id: MessageId,
135
136 pub jsonrpc: Arc<str>,
138
139 pub result: serde_json::Value,
141}
142
143impl SuccessfulResponse {
144 pub fn new(id: MessageId, result: serde_json::Value) -> Self {
146 Self {
147 id,
148 jsonrpc: JSON_RPC_VERSION.clone(),
149 result,
150 }
151 }
152
153 pub fn validate(&self) -> Result<(), PayloadError> {
155 if self.jsonrpc.as_ref() != JSON_RPC_VERSION_STR {
156 Err(PayloadError::InvalidJsonRpcVersion)
157 } else {
158 Ok(())
161 }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
167pub struct ErrorResponse {
168 pub id: MessageId,
170
171 pub jsonrpc: Arc<str>,
173
174 pub error: ErrorData,
176}
177
178impl ErrorResponse {
179 pub fn new(id: MessageId, error: impl Into<ErrorData>) -> Self {
181 Self {
182 id,
183 jsonrpc: JSON_RPC_VERSION.clone(),
184 error: error.into(),
185 }
186 }
187
188 pub fn validate(&self) -> Result<(), PayloadError> {
190 if self.jsonrpc.as_ref() != JSON_RPC_VERSION_STR {
191 Err(PayloadError::InvalidJsonRpcVersion)
192 } else {
193 Ok(())
194 }
195 }
196}
197
198#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
200pub struct ErrorData {
201 pub code: i32,
203
204 pub message: String,
206
207 #[serde(skip_serializing_if = "Option::is_none")]
209 pub data: Option<String>,
210}
211
212#[derive(Debug, thiserror::Error, strum::EnumString, strum::IntoStaticStr, PartialEq, Eq)]
213pub enum SubscriptionError {
214 #[error("Subscriber limit exceeded")]
215 SubscriberLimitExceeded,
216}
217
218#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
222pub struct Subscribe {
223 pub topic: Topic,
225}
226
227impl ServiceRequest for Subscribe {
228 type Error = SubscriptionError;
229 type Response = SubscriptionId;
230
231 fn validate(&self) -> Result<(), PayloadError> {
232 self.topic
233 .decode()
234 .map_err(|_| PayloadError::InvalidTopic)?;
235
236 Ok(())
237 }
238
239 fn into_params(self) -> Params {
240 Params::Subscribe(self)
241 }
242}
243
244#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
247pub struct SubscribeBlocking {
248 pub topic: Topic,
250}
251
252impl ServiceRequest for SubscribeBlocking {
253 type Error = SubscriptionError;
254 type Response = SubscriptionId;
255
256 fn validate(&self) -> Result<(), PayloadError> {
257 self.topic
258 .decode()
259 .map_err(|_| PayloadError::InvalidTopic)?;
260
261 Ok(())
262 }
263
264 fn into_params(self) -> Params {
265 Params::SubscribeBlocking(self)
266 }
267}
268
269#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
271pub struct Unsubscribe {
272 pub topic: Topic,
274}
275
276impl ServiceRequest for Unsubscribe {
277 type Error = SubscriptionError;
278 type Response = bool;
279
280 fn validate(&self) -> Result<(), PayloadError> {
281 self.topic
282 .decode()
283 .map_err(|_| PayloadError::InvalidTopic)?;
284
285 Ok(())
295 }
296
297 fn into_params(self) -> Params {
298 Params::Unsubscribe(self)
299 }
300}
301
302#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
304pub struct FetchMessages {
305 pub topic: Topic,
307}
308
309impl ServiceRequest for FetchMessages {
310 type Error = GenericError;
311 type Response = FetchResponse;
312
313 fn validate(&self) -> Result<(), PayloadError> {
314 self.topic
315 .decode()
316 .map_err(|_| PayloadError::InvalidTopic)?;
317
318 Ok(())
319 }
320
321 fn into_params(self) -> Params {
322 Params::FetchMessages(self)
323 }
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
328#[serde(rename_all = "camelCase")]
329pub struct FetchResponse {
330 pub messages: Vec<SubscriptionData>,
332
333 pub has_more: bool,
336}
337
338#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
342pub struct BatchSubscribe {
343 pub topics: Vec<Topic>,
345}
346
347impl BatchSubscribe {
348 fn validate_topics(topics: &[Topic]) -> Result<(), PayloadError> {
349 let batch_size = topics.len();
350
351 if batch_size == 0 {
352 return Err(PayloadError::BatchEmpty);
353 }
354
355 if batch_size > MAX_SUBSCRIPTION_BATCH_SIZE {
356 return Err(PayloadError::BatchLimitExceeded);
357 }
358
359 for topic in topics {
360 topic.decode().map_err(|_| PayloadError::InvalidTopic)?;
361 }
362
363 Ok(())
364 }
365}
366
367impl ServiceRequest for BatchSubscribe {
368 type Error = SubscriptionError;
369 type Response = Vec<SubscriptionId>;
370
371 fn validate(&self) -> Result<(), PayloadError> {
372 Self::validate_topics(&self.topics)
373 }
374
375 fn into_params(self) -> Params {
376 Params::BatchSubscribe(self)
377 }
378}
379
380#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
383pub struct BatchSubscribeBlocking {
384 pub topics: Vec<Topic>,
386}
387
388#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
389#[serde(rename_all = "camelCase")]
390pub enum SubscriptionResult {
391 Id(SubscriptionId),
392 Error(ErrorData),
393}
394
395impl ServiceRequest for BatchSubscribeBlocking {
396 type Error = SubscriptionError;
397 type Response = Vec<SubscriptionResult>;
398
399 fn validate(&self) -> Result<(), PayloadError> {
400 BatchSubscribe::validate_topics(&self.topics)
401 }
402
403 fn into_params(self) -> Params {
404 Params::BatchSubscribeBlocking(self)
405 }
406}
407
408#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
410pub struct BatchUnsubscribe {
411 pub subscriptions: Vec<Unsubscribe>,
413}
414
415impl ServiceRequest for BatchUnsubscribe {
416 type Error = SubscriptionError;
417 type Response = bool;
418
419 fn validate(&self) -> Result<(), PayloadError> {
420 let batch_size = self.subscriptions.len();
421
422 if batch_size == 0 {
423 return Err(PayloadError::BatchEmpty);
424 }
425
426 if batch_size > MAX_SUBSCRIPTION_BATCH_SIZE {
427 return Err(PayloadError::BatchLimitExceeded);
428 }
429
430 for sub in &self.subscriptions {
431 sub.validate()?;
432 }
433
434 Ok(())
435 }
436
437 fn into_params(self) -> Params {
438 Params::BatchUnsubscribe(self)
439 }
440}
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
444pub struct BatchFetchMessages {
445 pub topics: Vec<Topic>,
447}
448
449impl ServiceRequest for BatchFetchMessages {
450 type Error = GenericError;
451 type Response = FetchResponse;
452
453 fn validate(&self) -> Result<(), PayloadError> {
454 let batch_size = self.topics.len();
455
456 if batch_size == 0 {
457 return Err(PayloadError::BatchEmpty);
458 }
459
460 if batch_size > MAX_FETCH_BATCH_SIZE {
461 return Err(PayloadError::BatchLimitExceeded);
462 }
463
464 for topic in &self.topics {
465 topic.decode().map_err(|_| PayloadError::InvalidTopic)?;
466 }
467
468 Ok(())
469 }
470
471 fn into_params(self) -> Params {
472 Params::BatchFetchMessages(self)
473 }
474}
475
476#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
478pub struct Receipt {
479 pub topic: Topic,
481
482 pub message_id: MessageId,
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
488pub struct BatchReceiveMessages {
489 pub receipts: Vec<Receipt>,
491}
492
493impl ServiceRequest for BatchReceiveMessages {
494 type Error = GenericError;
495 type Response = bool;
496
497 fn validate(&self) -> Result<(), PayloadError> {
498 let batch_size = self.receipts.len();
499
500 if batch_size == 0 {
501 return Err(PayloadError::BatchEmpty);
502 }
503
504 if batch_size > MAX_RECEIVE_BATCH_SIZE {
505 return Err(PayloadError::BatchLimitExceeded);
506 }
507
508 for receipt in &self.receipts {
509 receipt
510 .topic
511 .decode()
512 .map_err(|_| PayloadError::InvalidTopic)?;
513 }
514
515 Ok(())
516 }
517
518 fn into_params(self) -> Params {
519 Params::BatchReceiveMessages(self)
520 }
521}
522
523#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
525pub struct Publish {
526 pub topic: Topic,
528
529 pub message: Arc<str>,
531
532 #[serde(default, skip_serializing_if = "is_default")]
533 pub attestation: Option<Arc<str>>,
534
535 #[serde(rename = "ttl")]
538 pub ttl_secs: u32,
539
540 pub tag: u32,
543
544 #[serde(default, skip_serializing_if = "is_default")]
547 pub prompt: bool,
548}
549
550impl Publish {
551 pub fn as_subscription(
553 &self,
554 subscription_id: SubscriptionId,
555 published_at: i64,
556 ) -> Subscription {
557 Subscription {
558 id: subscription_id,
559 data: SubscriptionData {
560 topic: self.topic.clone(),
561 message: self.message.clone(),
562 attestation: self.attestation.clone(),
563 published_at,
564 tag: self.tag,
565 },
566 }
567 }
568
569 pub fn as_subscription_request(
571 &self,
572 message_id: MessageId,
573 subscription_id: SubscriptionId,
574 published_at: i64,
575 ) -> Request {
576 Request {
577 id: message_id,
578 jsonrpc: JSON_RPC_VERSION.clone(),
579 params: Params::Subscription(self.as_subscription(subscription_id, published_at)),
580 }
581 }
582}
583
584#[derive(Debug, thiserror::Error, strum::EnumString, strum::IntoStaticStr, PartialEq, Eq)]
585pub enum PublishError {
586 #[error("TTL too short")]
587 TtlTooShort,
588
589 #[error("TTL too long")]
590 TtlTooLong,
591
592 #[error("Mailbox limit exceeded")]
593 MailboxLimitExceeded,
594}
595
596impl ServiceRequest for Publish {
597 type Error = PublishError;
598 type Response = bool;
599
600 fn validate(&self) -> Result<(), PayloadError> {
601 self.topic
602 .decode()
603 .map_err(|_| PayloadError::InvalidTopic)?;
604
605 Ok(())
606 }
607
608 fn into_params(self) -> Params {
609 Params::Publish(self)
610 }
611}
612
613fn is_default<T>(x: &T) -> bool
614where
615 T: Default + PartialEq + 'static,
616{
617 *x == Default::default()
618}
619
620#[derive(Debug, thiserror::Error, strum::EnumString, strum::IntoStaticStr, PartialEq, Eq)]
621pub enum GenericError {
622 #[error("Unknown error")]
623 Unknown,
624}
625
626#[derive(Debug, thiserror::Error, strum::EnumString, strum::IntoStaticStr, PartialEq, Eq)]
627pub enum WatchError {
628 #[error("Invalid TTL")]
629 InvalidTtl,
630
631 #[error("Service URL is invalid or too long")]
632 InvalidServiceUrl,
633
634 #[error("Webhook URL is invalid or too long")]
635 InvalidWebhookUrl,
636
637 #[error("Invalid action")]
638 InvalidAction,
639
640 #[error("Invalid JWT")]
641 InvalidJwt,
642}
643
644#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
645#[serde(rename_all = "camelCase")]
646pub struct WatchRegisterResponse {
647 pub relay_id: DidKey,
649}
650
651#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
653#[serde(rename_all = "camelCase")]
654pub struct WatchRegister {
655 pub register_auth: String,
657}
658
659impl ServiceRequest for WatchRegister {
660 type Error = WatchError;
661 type Response = WatchRegisterResponse;
662
663 fn validate(&self) -> Result<(), PayloadError> {
664 Ok(())
665 }
666
667 fn into_params(self) -> Params {
668 Params::WatchRegister(self)
669 }
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
674#[serde(rename_all = "camelCase")]
675pub struct WatchUnregister {
676 pub unregister_auth: String,
678}
679
680impl ServiceRequest for WatchUnregister {
681 type Error = WatchError;
682 type Response = bool;
683
684 fn validate(&self) -> Result<(), PayloadError> {
685 Ok(())
686 }
687
688 fn into_params(self) -> Params {
689 Params::WatchUnregister(self)
690 }
691}
692
693#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
695pub struct Subscription {
696 pub id: SubscriptionId,
698
699 pub data: SubscriptionData,
701}
702
703impl ServiceRequest for Subscription {
704 type Error = GenericError;
705 type Response = bool;
706
707 fn validate(&self) -> Result<(), PayloadError> {
708 self.id
709 .decode()
710 .map_err(|_| PayloadError::InvalidSubscriptionId)?;
711
712 self.data
713 .topic
714 .decode()
715 .map_err(|_| PayloadError::InvalidTopic)?;
716
717 Ok(())
718 }
719
720 fn into_params(self) -> Params {
721 Params::Subscription(self)
722 }
723}
724
725#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
727#[serde(rename_all = "camelCase")]
728pub struct SubscriptionData {
729 pub topic: Topic,
731
732 pub message: Arc<str>,
734
735 #[serde(default, skip_serializing_if = "is_default")]
736 pub attestation: Option<Arc<str>>,
737
738 pub published_at: i64,
740
741 #[serde(default, skip_serializing_if = "is_default")]
744 pub tag: u32,
745}
746
747#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
749#[serde(tag = "method", content = "params")]
750pub enum Params {
751 #[serde(rename = "irn_subscribe", alias = "iridium_subscribe")]
753 Subscribe(Subscribe),
754
755 #[serde(rename = "irn_subscribeBlocking", alias = "iridium_subscribeBlocking")]
757 SubscribeBlocking(SubscribeBlocking),
758
759 #[serde(rename = "irn_unsubscribe", alias = "iridium_unsubscribe")]
761 Unsubscribe(Unsubscribe),
762
763 #[serde(rename = "irn_fetchMessages", alias = "iridium_fetchMessages")]
765 FetchMessages(FetchMessages),
766
767 #[serde(rename = "irn_batchSubscribe", alias = "iridium_batchSubscribe")]
769 BatchSubscribe(BatchSubscribe),
770
771 #[serde(
773 rename = "irn_batchSubscribeBlocking",
774 alias = "iridium_batchSubscribeBlocking"
775 )]
776 BatchSubscribeBlocking(BatchSubscribeBlocking),
777
778 #[serde(rename = "irn_batchUnsubscribe", alias = "iridium_batchUnsubscribe")]
780 BatchUnsubscribe(BatchUnsubscribe),
781
782 #[serde(
784 rename = "irn_batchFetchMessages",
785 alias = "iridium_batchFetchMessages"
786 )]
787 BatchFetchMessages(BatchFetchMessages),
788
789 #[serde(rename = "irn_publish", alias = "iridium_publish")]
791 Publish(Publish),
792
793 #[serde(rename = "irn_batchReceive", alias = "iridium_batchReceive")]
795 BatchReceiveMessages(BatchReceiveMessages),
796
797 #[serde(rename = "irn_watchRegister", alias = "iridium_watchRegister")]
799 WatchRegister(WatchRegister),
800
801 #[serde(rename = "irn_watchUnregister", alias = "iridium_watchUnregister")]
803 WatchUnregister(WatchUnregister),
804
805 #[serde(rename = "irn_subscription", alias = "iridium_subscription")]
810 Subscription(Subscription),
811}
812
813#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
815pub struct Request {
816 pub id: MessageId,
818
819 pub jsonrpc: Arc<str>,
821
822 #[serde(flatten)]
824 pub params: Params,
825}
826
827impl Request {
828 pub fn new(id: MessageId, params: Params) -> Self {
830 Self {
831 id,
832 jsonrpc: JSON_RPC_VERSION_STR.into(),
833 params,
834 }
835 }
836
837 pub fn validate(&self) -> Result<(), PayloadError> {
839 if !self.id.validate() {
840 return Err(PayloadError::InvalidRequestId);
841 }
842
843 if self.jsonrpc.as_ref() != JSON_RPC_VERSION_STR {
844 return Err(PayloadError::InvalidJsonRpcVersion);
845 }
846
847 match &self.params {
848 Params::Subscribe(params) => params.validate(),
849 Params::SubscribeBlocking(params) => params.validate(),
850 Params::Unsubscribe(params) => params.validate(),
851 Params::FetchMessages(params) => params.validate(),
852 Params::BatchSubscribe(params) => params.validate(),
853 Params::BatchSubscribeBlocking(params) => params.validate(),
854 Params::BatchUnsubscribe(params) => params.validate(),
855 Params::BatchFetchMessages(params) => params.validate(),
856 Params::Publish(params) => params.validate(),
857 Params::BatchReceiveMessages(params) => params.validate(),
858 Params::WatchRegister(params) => params.validate(),
859 Params::WatchUnregister(params) => params.validate(),
860 Params::Subscription(params) => params.validate(),
861 }
862 }
863}