1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
use base64;
use chrono::Utc;
use futures::{
future::{ok, Either},
Future,
};
use interledger::{
packet::Address,
service::{Account, BoxedIlpFuture, OutgoingRequest, OutgoingService, Username},
};
use parking_lot::Mutex;
use reqwest::r#async::Client;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use tokio::spawn;
use tracing::{error, info};
use yup_oauth2::{service_account_key_from_file, GetToken, ServiceAccountAccess};
static TOKEN_SCOPES: Option<&str> = Some("https://www.googleapis.com/auth/pubsub");
/// Configuration for the Google PubSub packet publisher
#[derive(Deserialize, Clone, Debug)]
pub struct PubsubConfig {
/// Path to the Service Account Key JSON file.
/// You can obtain this file by logging into [console.cloud.google.com](https://console.cloud.google.com/)
service_account_credentials: String,
project_id: String,
topic: String,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct PubsubMessage {
message_id: Option<String>,
data: Option<String>,
attributes: Option<HashMap<String, String>>,
publish_time: Option<String>,
}
#[derive(Serialize)]
struct PubsubRequest {
messages: Vec<PubsubMessage>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct PacketRecord {
prev_hop_account: Username,
prev_hop_asset_code: String,
prev_hop_asset_scale: u8,
prev_hop_amount: u64,
next_hop_account: Username,
next_hop_asset_code: String,
next_hop_asset_scale: u8,
next_hop_amount: u64,
destination_ilp_address: Address,
fulfillment: String,
timestamp: String,
}
/// Create an Interledger service wrapper that publishes records
/// of fulfilled packets to Google Cloud PubSub.
///
/// This is an experimental feature that may be removed in the future.
pub fn create_google_pubsub_wrapper<
A: Account + 'static,
O: OutgoingService<A> + Clone + Send + 'static,
>(
config: Option<PubsubConfig>,
) -> impl Fn(OutgoingRequest<A>, O) -> BoxedIlpFuture + Clone {
// If Google credentials were passed in, create an HTTP client and
// OAuth2 client that will automatically fetch and cache access tokens
let utilities = if let Some(config) = config {
let key = service_account_key_from_file(config.service_account_credentials.as_str())
.expect("Unable to load Google Cloud credentials from file");
let access = ServiceAccountAccess::new(key);
// This needs to be wrapped in a Mutex because the .token()
// method takes a mutable reference to self and we want to
// reuse the same fetcher so that it caches the tokens
let token_fetcher = Arc::new(Mutex::new(access.build()));
// TODO make sure the client uses HTTP/2
let client = Client::new();
let api_endpoint = Arc::new(format!(
"https://pubsub.googleapis.com/v1/projects/{}/topics/{}:publish",
config.project_id, config.topic
));
info!("Fulfilled packets will be submitted to Google Cloud Pubsub (project ID: {}, topic: {})", config.project_id, config.topic);
Some((client, api_endpoint, token_fetcher))
} else {
None
};
move |request: OutgoingRequest<A>, mut next: O| -> BoxedIlpFuture {
match &utilities {
// Just pass the request on if no Google Pubsub details were configured
None => Box::new(next.send_request(request)),
Some((client, api_endpoint, token_fetcher)) => {
let prev_hop_account = request.from.username().clone();
let prev_hop_asset_code = request.from.asset_code().to_string();
let prev_hop_asset_scale = request.from.asset_scale();
let prev_hop_amount = request.original_amount;
let next_hop_account = request.to.username().clone();
let next_hop_asset_code = request.to.asset_code().to_string();
let next_hop_asset_scale = request.to.asset_scale();
let next_hop_amount = request.prepare.amount();
let destination_ilp_address = request.prepare.destination();
let client = client.clone();
let api_endpoint = api_endpoint.clone();
let token_fetcher = token_fetcher.clone();
Box::new(next.send_request(request).map(move |fulfill| {
// Only fulfilled packets are published for now
let fulfillment = base64::encode(fulfill.fulfillment());
let get_token_future = token_fetcher.lock()
.token(TOKEN_SCOPES)
.map_err(|err| {
error!("Error fetching OAuth token for Google PubSub: {:?}", err)
});
// Spawn a task to submit the packet to PubSub so we
// don't block returning the fulfillment
// Note this means that if there is a problem submitting the
// packet record to PubSub, it will only log an error
spawn(
get_token_future
.and_then(move |token| {
let record = PacketRecord {
prev_hop_account,
prev_hop_asset_code,
prev_hop_asset_scale,
prev_hop_amount,
next_hop_account,
next_hop_asset_code,
next_hop_asset_scale,
next_hop_amount,
destination_ilp_address,
fulfillment,
timestamp: Utc::now().to_rfc3339(),
};
let data = base64::encode(&serde_json::to_string(&record).unwrap());
client
.post(api_endpoint.as_str())
.bearer_auth(token.access_token)
.json(&PubsubRequest {
messages: vec![PubsubMessage {
// TODO should there be an ID?
message_id: None,
data: Some(data),
attributes: None,
publish_time: None,
}],
})
.send()
.map_err(|err| {
error!(
"Error sending packet details to Google PubSub: {:?}",
err
)
})
.and_then(|mut res| {
if res.status().is_success() {
Either::A(ok(()))
} else {
let status = res.status();
Either::B(res.text()
.map_err(|err| error!("Error getting response body: {:?}", err))
.and_then(move |body| {
error!(
%status,
"Error sending packet details to Google PubSub: {}",
body
);
Ok(())
}))
}
})
}),
);
fulfill
}))
}
}
}
}