ya_client/activity/requestor/
control.rs1use ya_client_model::activity::{
3 CreateActivityRequest, CreateActivityResult, ExeScriptCommandResult, ExeScriptRequest,
4 RuntimeEvent, ACTIVITY_API_PATH,
5};
6
7use crate::web::{default_on_timeout, Event, WebClient, WebInterface};
8use crate::{Error, Result};
9use futures::{Stream, StreamExt};
10use std::convert::TryFrom;
11
12#[derive(Clone)]
14pub struct ActivityRequestorControlApi {
15 client: WebClient,
16}
17
18impl WebInterface for ActivityRequestorControlApi {
19 const API_URL_ENV_VAR: &'static str = crate::activity::ACTIVITY_URL_ENV_VAR;
20 const API_SUFFIX: &'static str = ACTIVITY_API_PATH;
21
22 fn from_client(client: WebClient) -> Self {
23 ActivityRequestorControlApi { client }
24 }
25}
26
27impl ActivityRequestorControlApi {
28 pub async fn create_activity(&self, agreement_id: &str) -> Result<String> {
30 let r = CreateActivityRequest::new(agreement_id.to_owned());
31 let result: CreateActivityResult =
32 self.client.post("activity").send_json(&r).json().await?;
33 Ok(result.activity_id)
34 }
35
36 #[cfg(feature = "sgx")]
37 pub async fn create_secure_activity_raw(
38 &self,
39 agreement_id: &str,
40 pub_key: secp256k1::PublicKey,
41 ) -> Result<CreateActivityResult> {
42 let mut r = CreateActivityRequest::new(agreement_id.to_owned());
43 r.requestor_pub_key = Some(pub_key.to_string());
44 self.client.post("activity").send_json(&r).json().await
45 }
46
47 #[cfg(feature = "sgx")]
48 pub async fn create_secure_activity(
49 &self,
50 agreement_id: &str,
51 ) -> Result<sgx::SecureActivityRequestorApi> {
52 let s = secp256k1::Secp256k1::new();
53 let (secret, pub_key) = s.generate_keypair(&mut rand::thread_rng());
54 let result = self
55 .create_secure_activity_raw(agreement_id, pub_key)
56 .await?;
57 let api = sgx::SecureActivityRequestorApi::from_response(
58 self.client.clone(),
59 result.activity_id.clone(),
60 agreement_id,
61 result,
62 secret,
63 )
64 .await
65 .map_err(|e| crate::Error::InternalError(e.to_string()))?;
66 Ok(api)
67 }
68
69 pub async fn destroy_activity(&self, activity_id: &str) -> Result<()> {
71 let uri = url_format!("activity/{activity_id}");
72 self.client.delete(&uri).send().json().await?;
73 Ok(())
74 }
75
76 pub async fn exec(&self, script: ExeScriptRequest, activity_id: &str) -> Result<String> {
78 let uri = url_format!("activity/{activity_id}/exec");
79 self.client.post(&uri).send_json(&script).json().await
80 }
81
82 #[rustfmt::skip]
84 pub async fn get_exec_batch_results(
85 &self,
86 activity_id: &str,
87 batch_id: &str,
88 timeout: Option<f32>,
89 command_index: Option<usize>,
90 ) -> Result<Vec<ExeScriptCommandResult>> {
91 let uri = url_format!(
92 "activity/{activity_id}/exec/{batch_id}",
93 #[query] timeout,
94 #[query] command_index,
95 );
96 self.client.get(&uri).send().json().await.or_else(default_on_timeout)
97 }
98
99 pub async fn stream_exec_batch_results(
101 &self,
102 activity_id: &str,
103 batch_id: &str,
104 ) -> Result<impl Stream<Item = RuntimeEvent>> {
105 let uri = url_format!("activity/{activity_id}/exec/{batch_id}",);
106 let stream = self
107 .client
108 .event_stream(&uri)
109 .await?
110 .filter_map(|result| async {
111 match result {
112 Ok(evt) => RuntimeEvent::try_from(evt).ok(),
113 _ => None,
114 }
115 });
116 Ok(stream)
117 }
118}
119
120impl TryFrom<Event> for RuntimeEvent {
121 type Error = Error;
122
123 fn try_from(evt: Event) -> Result<Self> {
124 serde_json::from_str(evt.data.as_str()).map_err(Error::from)
125 }
126}
127
128#[cfg(feature = "sgx")]
129pub mod sgx {
130 use super::*;
131 use crate::market::MarketRequestorApi;
132 use crate::model::activity::encrypted as enc;
133 use crate::model::activity::{Credentials, ExeScriptCommand, SgxCredentials};
134 use crate::Error as AppError;
135 use crate::SGX_CONFIG;
136 use graphene_sgx::AttestationResponse;
137 use hex;
138 use secp256k1::{PublicKey, SecretKey};
139 use std::sync::Arc;
140 use ya_client_model::activity::encrypted::EncryptionCtx;
141 use ya_client_model::activity::ExeScriptCommandState;
142
143 #[derive(thiserror::Error, Debug)]
144 pub enum SgxError {
145 #[error("activity without keys")]
146 MissingKeys,
147 #[error("activity with unknown keys")]
148 InvalidKeys,
149 #[error("{0}")]
150 AttestationFailed(String),
151 #[error("invalid credentials: {0}")]
152 InvalidCredentials(String),
153 #[error("invalid agreement")]
154 InvalidAgreement,
155 #[error("YAGNA_APPKEY variable not set ({0})")]
156 InvalidAppKey(String),
157 #[error("internal error: {0}")]
158 InternalError(String),
159 }
160
161 macro_rules! map_error {
162 ($($type:ty => $error:path)*) => {
163 $(
164 impl From<$type> for SgxError {
165 fn from(err: $type) -> Self {
166 $error(err.to_string())
167 }
168 }
169 )*
170 };
171 }
172
173 map_error! {
174 hex::FromHexError => SgxError::InvalidCredentials
175 std::array::TryFromSliceError => SgxError::InvalidCredentials
176 std::env::VarError => SgxError::InvalidAppKey
177 }
178
179 struct Session {
180 activity_id: String,
181 #[allow(unused)]
182 enclave_key: PublicKey,
183 ctx: EncryptionCtx,
184 proof: SgxCredentials,
185 }
186
187 #[derive(Clone)]
188 pub struct SecureActivityRequestorApi {
189 client: WebClient,
190 session: Arc<Session>,
191 }
192
193 fn gen_id() -> String {
194 use rand::Rng;
195 let v: u128 = rand::thread_rng().gen();
196 format!("{:032x}", v)
197 }
198
199 impl SecureActivityRequestorApi {
200 pub async fn from_response(
201 client: WebClient,
202 activity_id: String,
203 agreement_id: &str,
204 response: CreateActivityResult,
205 requestor_key: SecretKey,
206 ) -> std::result::Result<Self, SgxError> {
207 let sgx: SgxCredentials = match response.credentials {
208 Some(Credentials::Sgx(sgx)) => sgx,
209 None => return Err(SgxError::MissingKeys),
210 Some(_) => return Err(SgxError::InvalidKeys),
211 };
212 let enclave_key = sgx.enclave_pub_key;
213 let ctx = EncryptionCtx::new(&enclave_key, &requestor_key);
214 let nonce = &activity_id.to_owned();
215 let session = Arc::new(Session {
216 activity_id,
217 enclave_key,
218 ctx,
219 proof: sgx.clone(),
220 });
221
222 if SGX_CONFIG.enable_attestation {
223 let agreement = WebClient::builder()
224 .auth_token(&std::env::var("YAGNA_APPKEY")?)
225 .build()
226 .interface::<MarketRequestorApi>()
227 .map_err(|e| SgxError::InternalError(e.to_string()))?
228 .get_agreement(agreement_id)
229 .await
230 .map_err(|e| SgxError::InternalError(e.to_string()))?;
231
232 log::debug!("Agreement: {:?}", &agreement);
233
234 let task_package = agreement
235 .demand
236 .properties
237 .get("golem.srv.comp.task_package")
238 .ok_or(SgxError::InvalidAgreement)?
239 .as_str()
240 .ok_or(SgxError::InvalidAgreement)?;
241
242 let evidence = AttestationResponse::new(sgx.ias_report.clone(), &sgx.ias_sig);
243 let mut verifier = evidence.verifier();
244 verifier = verifier
245 .data(&sgx.requestor_pub_key.serialize())
246 .data(&sgx.enclave_pub_key.serialize())
247 .data(task_package.as_bytes())
248 .mr_enclave_list(&SGX_CONFIG.exeunit_hashes)
249 .nonce(nonce)
250 .max_age(SGX_CONFIG.max_evidence_age);
251
252 if !SGX_CONFIG.allow_debug {
253 verifier = verifier.not_debug();
254 }
255
256 if !SGX_CONFIG.allow_outdated_tcb {
257 verifier = verifier.not_outdated();
258 }
259
260 let attestation_result = verifier.check();
261 if attestation_result.is_ok() {
262 log::info!("Attestation OK");
263 Ok(SecureActivityRequestorApi { client, session })
264 } else {
265 log::warn!("Attestation failed: {:?}", attestation_result);
266 Err(SgxError::AttestationFailed(format!(
267 "{:?}",
268 attestation_result
269 )))
270 }
271 } else {
272 log::info!("Attestation disabled");
273 Ok(SecureActivityRequestorApi { client, session })
274 }
275 }
276
277 pub fn proof(&self) -> Credentials {
278 Credentials::Sgx(self.session.proof.clone())
279 }
280
281 pub fn activity_id(&self) -> String {
282 self.session.activity_id.clone()
283 }
284
285 pub async fn exec(&self, exe_script: Vec<ExeScriptCommand>) -> Result<String> {
286 let request = enc::Request {
287 activity_id: self.session.activity_id.clone(),
288 batch_id: gen_id(),
289 timeout: None,
290 command: enc::RequestCommand::Exec { exe_script },
291 };
292 let resp = match self.send(request).await? {
293 enc::Response::Exec(r) => r,
294 enc::Response::Error(e) => Err(e),
295 _ => return Err(AppError::InternalError("invalid response".to_string())),
296 };
297 resp.map_err(|e| AppError::InternalError(e.to_string()))
298 }
299
300 pub async fn get_exec_batch_results(
301 &self,
302 batch_id: &str,
303 timeout: Option<f32>,
304 command_index: Option<usize>,
305 ) -> Result<Vec<ExeScriptCommandResult>> {
306 let request = enc::Request {
307 activity_id: self.session.activity_id.clone(),
308 batch_id: batch_id.to_string(),
309 timeout,
310 command: enc::RequestCommand::GetExecBatchResults { command_index },
311 };
312 let resp = match self.send(request).await? {
313 enc::Response::GetExecBatchResults(r) => r,
314 enc::Response::Error(e) => Err(e),
315 _ => return Err(AppError::InternalError("invalid response".to_string())),
316 };
317 resp.map_err(|e| AppError::InternalError(e.to_string()))
318 }
319
320 pub async fn get_running_command(
321 &self,
322 timeout: Option<f32>,
323 ) -> Result<Vec<ExeScriptCommandState>> {
324 let request = enc::Request {
325 activity_id: self.session.activity_id.clone(),
326 batch_id: String::new(),
327 timeout,
328 command: enc::RequestCommand::GetRunningCommand,
329 };
330 let resp = match self.send(request).await? {
331 enc::Response::GetRunningCommand(r) => r,
332 enc::Response::Error(e) => Err(e),
333 _ => return Err(AppError::InternalError("invalid response".to_string())),
334 };
335 resp.map_err(|e| AppError::InternalError(e.to_string()))
336 }
337
338 async fn send(&self, request: enc::Request) -> Result<enc::Response> {
339 let bytes = self
340 .session
341 .ctx
342 .encrypt(&request)
343 .map_err(|e| AppError::InternalError(e.to_string()))?;
344 let uri = format!(
345 "activity/{activity_id}/encrypted",
346 activity_id = self.session.activity_id
347 );
348 let response = self
349 .session
350 .ctx
351 .decrypt(&self.client.post(&uri).send_bytes(bytes).bytes().await?)
352 .map_err(|e| AppError::InternalError(e.to_string()))?;
353 Ok(response)
354 }
355 }
356}
357
358#[cfg(test)]
359mod test {
360
361 #[test]
362 #[cfg(feature = "sgx")]
363 fn test_encdec() {
364 use crate::model::activity::encrypted::EncryptionCtx;
365 use rand::Rng;
366
367 let mut rng = rand::thread_rng();
368 let s = secp256k1::Secp256k1::new();
369 let (s1, p1) = s.generate_keypair(&mut rng);
370 let (s2, p2) = s.generate_keypair(&mut rng);
371
372 let ctx1 = EncryptionCtx::new(&p2, &s1);
373 let ctx2 = EncryptionCtx::new(&p1, &s2);
374 let data: [u8; 20] = rng.gen();
375 let data2 = ctx2
376 .decrypt_bytes(&ctx1.encrypt_bytes(&data).unwrap())
377 .unwrap();
378 assert_eq!(data2.as_slice(), data.as_ref())
379 }
380}