ya_client/activity/requestor/
control.rs

1//! Requestor control part of Activity API
2use ya_client_model::activity::{
3    CreateActivityRequest, CreateActivityResult, ExeScriptCommandResult, ExeScriptRequest,
4    RuntimeEvent, ACTIVITY_API_PATH,
5};
6
7use crate::web::{default_on_timeout, Event, WebClient, WebInterface};
8use crate::{Error, Result};
9use futures::{Stream, StreamExt};
10use std::convert::TryFrom;
11
12/// Bindings for Requestor Control part of the Activity API.
13#[derive(Clone)]
14pub struct ActivityRequestorControlApi {
15    client: WebClient,
16}
17
18impl WebInterface for ActivityRequestorControlApi {
19    const API_URL_ENV_VAR: &'static str = crate::activity::ACTIVITY_URL_ENV_VAR;
20    const API_SUFFIX: &'static str = ACTIVITY_API_PATH;
21
22    fn from_client(client: WebClient) -> Self {
23        ActivityRequestorControlApi { client }
24    }
25}
26
27impl ActivityRequestorControlApi {
28    /// Creates new Activity based on given Agreement.
29    pub async fn create_activity(&self, agreement_id: &str) -> Result<String> {
30        let r = CreateActivityRequest::new(agreement_id.to_owned());
31        let result: CreateActivityResult =
32            self.client.post("activity").send_json(&r).json().await?;
33        Ok(result.activity_id)
34    }
35
36    #[cfg(feature = "sgx")]
37    pub async fn create_secure_activity_raw(
38        &self,
39        agreement_id: &str,
40        pub_key: secp256k1::PublicKey,
41    ) -> Result<CreateActivityResult> {
42        let mut r = CreateActivityRequest::new(agreement_id.to_owned());
43        r.requestor_pub_key = Some(pub_key.to_string());
44        self.client.post("activity").send_json(&r).json().await
45    }
46
47    #[cfg(feature = "sgx")]
48    pub async fn create_secure_activity(
49        &self,
50        agreement_id: &str,
51    ) -> Result<sgx::SecureActivityRequestorApi> {
52        let s = secp256k1::Secp256k1::new();
53        let (secret, pub_key) = s.generate_keypair(&mut rand::thread_rng());
54        let result = self
55            .create_secure_activity_raw(agreement_id, pub_key)
56            .await?;
57        let api = sgx::SecureActivityRequestorApi::from_response(
58            self.client.clone(),
59            result.activity_id.clone(),
60            agreement_id,
61            result,
62            secret,
63        )
64        .await
65        .map_err(|e| crate::Error::InternalError(e.to_string()))?;
66        Ok(api)
67    }
68
69    /// Destroys given Activity.
70    pub async fn destroy_activity(&self, activity_id: &str) -> Result<()> {
71        let uri = url_format!("activity/{activity_id}");
72        self.client.delete(&uri).send().json().await?;
73        Ok(())
74    }
75
76    /// Executes an ExeScript batch within a given Activity.
77    pub async fn exec(&self, script: ExeScriptRequest, activity_id: &str) -> Result<String> {
78        let uri = url_format!("activity/{activity_id}/exec");
79        self.client.post(&uri).send_json(&script).json().await
80    }
81
82    /// Queries for ExeScript batch results.
83    #[rustfmt::skip]
84    pub async fn get_exec_batch_results(
85        &self,
86        activity_id: &str,
87        batch_id: &str,
88        timeout: Option<f32>,
89        command_index: Option<usize>,
90    ) -> Result<Vec<ExeScriptCommandResult>> {
91        let uri = url_format!(
92            "activity/{activity_id}/exec/{batch_id}",
93            #[query] timeout,
94            #[query] command_index,
95        );
96        self.client.get(&uri).send().json().await.or_else(default_on_timeout)
97    }
98
99    /// Streams ExeScript batch results
100    pub async fn stream_exec_batch_results(
101        &self,
102        activity_id: &str,
103        batch_id: &str,
104    ) -> Result<impl Stream<Item = RuntimeEvent>> {
105        let uri = url_format!("activity/{activity_id}/exec/{batch_id}",);
106        let stream = self
107            .client
108            .event_stream(&uri)
109            .await?
110            .filter_map(|result| async {
111                match result {
112                    Ok(evt) => RuntimeEvent::try_from(evt).ok(),
113                    _ => None,
114                }
115            });
116        Ok(stream)
117    }
118}
119
120impl TryFrom<Event> for RuntimeEvent {
121    type Error = Error;
122
123    fn try_from(evt: Event) -> Result<Self> {
124        serde_json::from_str(evt.data.as_str()).map_err(Error::from)
125    }
126}
127
128#[cfg(feature = "sgx")]
129pub mod sgx {
130    use super::*;
131    use crate::market::MarketRequestorApi;
132    use crate::model::activity::encrypted as enc;
133    use crate::model::activity::{Credentials, ExeScriptCommand, SgxCredentials};
134    use crate::Error as AppError;
135    use crate::SGX_CONFIG;
136    use graphene_sgx::AttestationResponse;
137    use hex;
138    use secp256k1::{PublicKey, SecretKey};
139    use std::sync::Arc;
140    use ya_client_model::activity::encrypted::EncryptionCtx;
141    use ya_client_model::activity::ExeScriptCommandState;
142
143    #[derive(thiserror::Error, Debug)]
144    pub enum SgxError {
145        #[error("activity without keys")]
146        MissingKeys,
147        #[error("activity with unknown keys")]
148        InvalidKeys,
149        #[error("{0}")]
150        AttestationFailed(String),
151        #[error("invalid credentials: {0}")]
152        InvalidCredentials(String),
153        #[error("invalid agreement")]
154        InvalidAgreement,
155        #[error("YAGNA_APPKEY variable not set ({0})")]
156        InvalidAppKey(String),
157        #[error("internal error: {0}")]
158        InternalError(String),
159    }
160
161    macro_rules! map_error {
162        ($($type:ty => $error:path)*) => {
163            $(
164                impl From<$type> for SgxError {
165                    fn from(err: $type) -> Self {
166                        $error(err.to_string())
167                    }
168                }
169            )*
170        };
171    }
172
173    map_error! {
174        hex::FromHexError => SgxError::InvalidCredentials
175        std::array::TryFromSliceError => SgxError::InvalidCredentials
176        std::env::VarError => SgxError::InvalidAppKey
177    }
178
179    struct Session {
180        activity_id: String,
181        #[allow(unused)]
182        enclave_key: PublicKey,
183        ctx: EncryptionCtx,
184        proof: SgxCredentials,
185    }
186
187    #[derive(Clone)]
188    pub struct SecureActivityRequestorApi {
189        client: WebClient,
190        session: Arc<Session>,
191    }
192
193    fn gen_id() -> String {
194        use rand::Rng;
195        let v: u128 = rand::thread_rng().gen();
196        format!("{:032x}", v)
197    }
198
199    impl SecureActivityRequestorApi {
200        pub async fn from_response(
201            client: WebClient,
202            activity_id: String,
203            agreement_id: &str,
204            response: CreateActivityResult,
205            requestor_key: SecretKey,
206        ) -> std::result::Result<Self, SgxError> {
207            let sgx: SgxCredentials = match response.credentials {
208                Some(Credentials::Sgx(sgx)) => sgx,
209                None => return Err(SgxError::MissingKeys),
210                Some(_) => return Err(SgxError::InvalidKeys),
211            };
212            let enclave_key = sgx.enclave_pub_key;
213            let ctx = EncryptionCtx::new(&enclave_key, &requestor_key);
214            let nonce = &activity_id.to_owned();
215            let session = Arc::new(Session {
216                activity_id,
217                enclave_key,
218                ctx,
219                proof: sgx.clone(),
220            });
221
222            if SGX_CONFIG.enable_attestation {
223                let agreement = WebClient::builder()
224                    .auth_token(&std::env::var("YAGNA_APPKEY")?)
225                    .build()
226                    .interface::<MarketRequestorApi>()
227                    .map_err(|e| SgxError::InternalError(e.to_string()))?
228                    .get_agreement(agreement_id)
229                    .await
230                    .map_err(|e| SgxError::InternalError(e.to_string()))?;
231
232                log::debug!("Agreement: {:?}", &agreement);
233
234                let task_package = agreement
235                    .demand
236                    .properties
237                    .get("golem.srv.comp.task_package")
238                    .ok_or(SgxError::InvalidAgreement)?
239                    .as_str()
240                    .ok_or(SgxError::InvalidAgreement)?;
241
242                let evidence = AttestationResponse::new(sgx.ias_report.clone(), &sgx.ias_sig);
243                let mut verifier = evidence.verifier();
244                verifier = verifier
245                    .data(&sgx.requestor_pub_key.serialize())
246                    .data(&sgx.enclave_pub_key.serialize())
247                    .data(task_package.as_bytes())
248                    .mr_enclave_list(&SGX_CONFIG.exeunit_hashes)
249                    .nonce(nonce)
250                    .max_age(SGX_CONFIG.max_evidence_age);
251
252                if !SGX_CONFIG.allow_debug {
253                    verifier = verifier.not_debug();
254                }
255
256                if !SGX_CONFIG.allow_outdated_tcb {
257                    verifier = verifier.not_outdated();
258                }
259
260                let attestation_result = verifier.check();
261                if attestation_result.is_ok() {
262                    log::info!("Attestation OK");
263                    Ok(SecureActivityRequestorApi { client, session })
264                } else {
265                    log::warn!("Attestation failed: {:?}", attestation_result);
266                    Err(SgxError::AttestationFailed(format!(
267                        "{:?}",
268                        attestation_result
269                    )))
270                }
271            } else {
272                log::info!("Attestation disabled");
273                Ok(SecureActivityRequestorApi { client, session })
274            }
275        }
276
277        pub fn proof(&self) -> Credentials {
278            Credentials::Sgx(self.session.proof.clone())
279        }
280
281        pub fn activity_id(&self) -> String {
282            self.session.activity_id.clone()
283        }
284
285        pub async fn exec(&self, exe_script: Vec<ExeScriptCommand>) -> Result<String> {
286            let request = enc::Request {
287                activity_id: self.session.activity_id.clone(),
288                batch_id: gen_id(),
289                timeout: None,
290                command: enc::RequestCommand::Exec { exe_script },
291            };
292            let resp = match self.send(request).await? {
293                enc::Response::Exec(r) => r,
294                enc::Response::Error(e) => Err(e),
295                _ => return Err(AppError::InternalError("invalid response".to_string())),
296            };
297            resp.map_err(|e| AppError::InternalError(e.to_string()))
298        }
299
300        pub async fn get_exec_batch_results(
301            &self,
302            batch_id: &str,
303            timeout: Option<f32>,
304            command_index: Option<usize>,
305        ) -> Result<Vec<ExeScriptCommandResult>> {
306            let request = enc::Request {
307                activity_id: self.session.activity_id.clone(),
308                batch_id: batch_id.to_string(),
309                timeout,
310                command: enc::RequestCommand::GetExecBatchResults { command_index },
311            };
312            let resp = match self.send(request).await? {
313                enc::Response::GetExecBatchResults(r) => r,
314                enc::Response::Error(e) => Err(e),
315                _ => return Err(AppError::InternalError("invalid response".to_string())),
316            };
317            resp.map_err(|e| AppError::InternalError(e.to_string()))
318        }
319
320        pub async fn get_running_command(
321            &self,
322            timeout: Option<f32>,
323        ) -> Result<Vec<ExeScriptCommandState>> {
324            let request = enc::Request {
325                activity_id: self.session.activity_id.clone(),
326                batch_id: String::new(),
327                timeout,
328                command: enc::RequestCommand::GetRunningCommand,
329            };
330            let resp = match self.send(request).await? {
331                enc::Response::GetRunningCommand(r) => r,
332                enc::Response::Error(e) => Err(e),
333                _ => return Err(AppError::InternalError("invalid response".to_string())),
334            };
335            resp.map_err(|e| AppError::InternalError(e.to_string()))
336        }
337
338        async fn send(&self, request: enc::Request) -> Result<enc::Response> {
339            let bytes = self
340                .session
341                .ctx
342                .encrypt(&request)
343                .map_err(|e| AppError::InternalError(e.to_string()))?;
344            let uri = format!(
345                "activity/{activity_id}/encrypted",
346                activity_id = self.session.activity_id
347            );
348            let response = self
349                .session
350                .ctx
351                .decrypt(&self.client.post(&uri).send_bytes(bytes).bytes().await?)
352                .map_err(|e| AppError::InternalError(e.to_string()))?;
353            Ok(response)
354        }
355    }
356}
357
358#[cfg(test)]
359mod test {
360
361    #[test]
362    #[cfg(feature = "sgx")]
363    fn test_encdec() {
364        use crate::model::activity::encrypted::EncryptionCtx;
365        use rand::Rng;
366
367        let mut rng = rand::thread_rng();
368        let s = secp256k1::Secp256k1::new();
369        let (s1, p1) = s.generate_keypair(&mut rng);
370        let (s2, p2) = s.generate_keypair(&mut rng);
371
372        let ctx1 = EncryptionCtx::new(&p2, &s1);
373        let ctx2 = EncryptionCtx::new(&p1, &s2);
374        let data: [u8; 20] = rng.gen();
375        let data2 = ctx2
376            .decrypt_bytes(&ctx1.encrypt_bytes(&data).unwrap())
377            .unwrap();
378        assert_eq!(data2.as_slice(), data.as_ref())
379    }
380}