corda_rpc/
client.rs

1use std::collections::HashMap;
2use std::convert::TryFrom;
3use std::time::SystemTime;
4
5use oasis_amqp::{amqp, proto::BytesFrame};
6use rand::{self, Rng};
7use serde_bytes::Bytes;
8use tokio::net::ToSocketAddrs;
9use uuid::Uuid;
10
11use crate::types::Rpc;
12
13pub struct Client {
14    inner: oasis_amqp::Client,
15    user: String,
16    container: String,
17}
18
19impl Client {
20    pub async fn new<A: ToSocketAddrs>(
21        address: A,
22        user: String,
23        password: &str,
24        container: String,
25    ) -> Result<Self, ()> {
26        let mut inner = oasis_amqp::Client::connect(address).await.map_err(|_| ())?;
27        inner.login(&user, &password).await?;
28        inner.open(&container).await?;
29        inner.begin().await?;
30
31        let sender_name = format!("corda-rpc-{:x}", Uuid::new_v4().to_hyphenated());
32        inner
33            .attach(amqp::Attach {
34                name: &sender_name,
35                handle: 0,
36                role: amqp::Role::Sender,
37                snd_settle_mode: None,
38                rcv_settle_mode: None,
39                source: Some(amqp::Source {
40                    address: Some(&container),
41                    ..Default::default()
42                }),
43                target: Some(amqp::Target {
44                    address: Some("rpc.server"),
45                    ..Default::default()
46                }),
47                unsettled: None,
48                incomplete_unsettled: None,
49                initial_delivery_count: Some(0),
50                max_message_size: None,
51                offered_capabilities: None,
52                desired_capabilities: None,
53                properties: None,
54            })
55            .await?;
56
57        Ok(Self {
58            inner,
59            user,
60            container,
61        })
62    }
63
64    pub async fn call<'r, T: Rpc<'static>>(&mut self, rpc: &T) -> Result<BytesFrame, T::Error> {
65        let rcv_queue_name = format!(
66            "rpc.client.{}.{}",
67            self.user,
68            rand::thread_rng().gen::<u64>() & 0xefff_ffff_ffff_ffff,
69        );
70
71        self.inner
72            .attach(amqp::Attach {
73                name: &rcv_queue_name,
74                handle: 1,
75                role: amqp::Role::Receiver,
76                snd_settle_mode: None,
77                rcv_settle_mode: None,
78                source: Some(amqp::Source {
79                    address: Some(&rcv_queue_name),
80                    ..Default::default()
81                }),
82                target: Some(amqp::Target {
83                    address: Some(&self.container),
84                    ..Default::default()
85                }),
86                unsettled: None,
87                incomplete_unsettled: None,
88                initial_delivery_count: None,
89                max_message_size: None,
90                offered_capabilities: None,
91                desired_capabilities: None,
92                properties: None,
93            })
94            .await?;
95
96        self.inner
97            .flow(amqp::Flow {
98                next_incoming_id: Some(1),
99                incoming_window: 2_147_483_647,
100                next_outgoing_id: 1,
101                outgoing_window: 2_147_483_647,
102                handle: Some(1),
103                delivery_count: Some(0),
104                link_credit: Some(1000),
105                available: None,
106                drain: None,
107                echo: None,
108                properties: None,
109            })
110            .await?;
111
112        let now = SystemTime::now();
113        let timestamp = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
114        let timestamp = i64::try_from(timestamp.as_millis()).unwrap();
115
116        let rpc_id = format!("{:x}", Uuid::new_v4().to_hyphenated());
117        let rpc_session_id = format!("{:x}", Uuid::new_v4().to_hyphenated());
118        let delivery_tag = Uuid::new_v4();
119
120        let mut properties = HashMap::new();
121        properties.insert("_AMQ_VALIDATED_USER", amqp::Any::Str(&self.user));
122        properties.insert("tag", amqp::Any::I32(0));
123        properties.insert("method-name", amqp::Any::Str(rpc.method()));
124        properties.insert("rpc-id", amqp::Any::Str(&rpc_id));
125        properties.insert("rpc-id-timestamp", amqp::Any::I64(timestamp));
126        properties.insert("rpc-session-id", amqp::Any::Str(&rpc_session_id));
127        properties.insert("rpc-session-id-timestamp", amqp::Any::I64(timestamp));
128        properties.insert("deduplication-sequence-number", amqp::Any::I64(0));
129
130        let mut body = vec![];
131        rpc.request().encode(&mut body).unwrap();
132
133        self.inner
134            .transfer(
135                amqp::Transfer {
136                    handle: 0,
137                    delivery_id: Some(0),
138                    delivery_tag: Some(delivery_tag.as_bytes().to_vec()),
139                    message_format: Some(0),
140                    ..Default::default()
141                },
142                amqp::Message {
143                    properties: Some(amqp::Properties {
144                        message_id: Some(rpc_id.clone().into()),
145                        reply_to: Some(rcv_queue_name.clone().into()),
146                        user_id: Some(Bytes::new(self.user.as_bytes())),
147                        ..Default::default()
148                    }),
149                    application_properties: Some(amqp::ApplicationProperties(properties)),
150                    body: Some(amqp::Body::Data(amqp::Data(&body))),
151                    ..Default::default()
152                },
153            )
154            .await
155            .unwrap();
156
157        match self.inner.next().await {
158            Some(Ok(frame)) => Ok(frame),
159            _ => Err(().into()),
160        }
161    }
162}