1use std::collections::HashMap;
2use std::convert::TryFrom;
3use std::time::SystemTime;
4
5use oasis_amqp::{amqp, proto::BytesFrame};
6use rand::{self, Rng};
7use serde_bytes::Bytes;
8use tokio::net::ToSocketAddrs;
9use uuid::Uuid;
10
11use crate::types::Rpc;
12
13pub struct Client {
14 inner: oasis_amqp::Client,
15 user: String,
16 container: String,
17}
18
19impl Client {
20 pub async fn new<A: ToSocketAddrs>(
21 address: A,
22 user: String,
23 password: &str,
24 container: String,
25 ) -> Result<Self, ()> {
26 let mut inner = oasis_amqp::Client::connect(address).await.map_err(|_| ())?;
27 inner.login(&user, &password).await?;
28 inner.open(&container).await?;
29 inner.begin().await?;
30
31 let sender_name = format!("corda-rpc-{:x}", Uuid::new_v4().to_hyphenated());
32 inner
33 .attach(amqp::Attach {
34 name: &sender_name,
35 handle: 0,
36 role: amqp::Role::Sender,
37 snd_settle_mode: None,
38 rcv_settle_mode: None,
39 source: Some(amqp::Source {
40 address: Some(&container),
41 ..Default::default()
42 }),
43 target: Some(amqp::Target {
44 address: Some("rpc.server"),
45 ..Default::default()
46 }),
47 unsettled: None,
48 incomplete_unsettled: None,
49 initial_delivery_count: Some(0),
50 max_message_size: None,
51 offered_capabilities: None,
52 desired_capabilities: None,
53 properties: None,
54 })
55 .await?;
56
57 Ok(Self {
58 inner,
59 user,
60 container,
61 })
62 }
63
64 pub async fn call<'r, T: Rpc<'static>>(&mut self, rpc: &T) -> Result<BytesFrame, T::Error> {
65 let rcv_queue_name = format!(
66 "rpc.client.{}.{}",
67 self.user,
68 rand::thread_rng().gen::<u64>() & 0xefff_ffff_ffff_ffff,
69 );
70
71 self.inner
72 .attach(amqp::Attach {
73 name: &rcv_queue_name,
74 handle: 1,
75 role: amqp::Role::Receiver,
76 snd_settle_mode: None,
77 rcv_settle_mode: None,
78 source: Some(amqp::Source {
79 address: Some(&rcv_queue_name),
80 ..Default::default()
81 }),
82 target: Some(amqp::Target {
83 address: Some(&self.container),
84 ..Default::default()
85 }),
86 unsettled: None,
87 incomplete_unsettled: None,
88 initial_delivery_count: None,
89 max_message_size: None,
90 offered_capabilities: None,
91 desired_capabilities: None,
92 properties: None,
93 })
94 .await?;
95
96 self.inner
97 .flow(amqp::Flow {
98 next_incoming_id: Some(1),
99 incoming_window: 2_147_483_647,
100 next_outgoing_id: 1,
101 outgoing_window: 2_147_483_647,
102 handle: Some(1),
103 delivery_count: Some(0),
104 link_credit: Some(1000),
105 available: None,
106 drain: None,
107 echo: None,
108 properties: None,
109 })
110 .await?;
111
112 let now = SystemTime::now();
113 let timestamp = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
114 let timestamp = i64::try_from(timestamp.as_millis()).unwrap();
115
116 let rpc_id = format!("{:x}", Uuid::new_v4().to_hyphenated());
117 let rpc_session_id = format!("{:x}", Uuid::new_v4().to_hyphenated());
118 let delivery_tag = Uuid::new_v4();
119
120 let mut properties = HashMap::new();
121 properties.insert("_AMQ_VALIDATED_USER", amqp::Any::Str(&self.user));
122 properties.insert("tag", amqp::Any::I32(0));
123 properties.insert("method-name", amqp::Any::Str(rpc.method()));
124 properties.insert("rpc-id", amqp::Any::Str(&rpc_id));
125 properties.insert("rpc-id-timestamp", amqp::Any::I64(timestamp));
126 properties.insert("rpc-session-id", amqp::Any::Str(&rpc_session_id));
127 properties.insert("rpc-session-id-timestamp", amqp::Any::I64(timestamp));
128 properties.insert("deduplication-sequence-number", amqp::Any::I64(0));
129
130 let mut body = vec![];
131 rpc.request().encode(&mut body).unwrap();
132
133 self.inner
134 .transfer(
135 amqp::Transfer {
136 handle: 0,
137 delivery_id: Some(0),
138 delivery_tag: Some(delivery_tag.as_bytes().to_vec()),
139 message_format: Some(0),
140 ..Default::default()
141 },
142 amqp::Message {
143 properties: Some(amqp::Properties {
144 message_id: Some(rpc_id.clone().into()),
145 reply_to: Some(rcv_queue_name.clone().into()),
146 user_id: Some(Bytes::new(self.user.as_bytes())),
147 ..Default::default()
148 }),
149 application_properties: Some(amqp::ApplicationProperties(properties)),
150 body: Some(amqp::Body::Data(amqp::Data(&body))),
151 ..Default::default()
152 },
153 )
154 .await
155 .unwrap();
156
157 match self.inner.next().await {
158 Some(Ok(frame)) => Ok(frame),
159 _ => Err(().into()),
160 }
161 }
162}