1use crate::{
4 helper,
5 response::{Rejection, Response, ResponseCode},
6};
7use bytes::Bytes;
8use multer::Multipart;
9use serde::de::DeserializeOwned;
10use std::{borrow::Cow, net::IpAddr, str::FromStr, sync::Arc, time::Instant};
11use zino_channel::{CloudEvent, Subscription};
12use zino_core::{
13 JsonValue, Map, SharedString, Uuid,
14 application::Agent,
15 error::Error,
16 extension::HeaderMapExt,
17 model::{ModelHooks, Query},
18 trace::{TraceContext, TraceState},
19 warn,
20};
21use zino_storage::NamedFile;
22
23#[cfg(feature = "auth")]
24use zino_auth::{AccessKeyId, Authentication, ParseSecurityTokenError, SecurityToken, SessionId};
25
26#[cfg(feature = "auth")]
27use zino_core::{datetime::DateTime, extension::JsonObjectExt, validation::Validation};
28
29#[cfg(feature = "cookie")]
30use cookie::{Cookie, SameSite};
31
32#[cfg(feature = "jwt")]
33use jwt_simple::algorithms::MACLike;
34#[cfg(feature = "jwt")]
35use zino_auth::JwtClaims;
36
37#[cfg(any(feature = "cookie", feature = "jwt"))]
38use std::time::Duration;
39
40#[cfg(feature = "i18n")]
41use fluent::FluentArgs;
42#[cfg(feature = "i18n")]
43use unic_langid::LanguageIdentifier;
44#[cfg(feature = "i18n")]
45use zino_core::i18n::{Intl, IntlError};
46
47mod context;
48
49pub use context::Context;
50
51pub trait RequestContext {
53 type Method: AsRef<str>;
55 type Uri;
57
58 fn request_method(&self) -> &Self::Method;
60
61 fn original_uri(&self) -> &Self::Uri;
63
64 fn matched_route(&self) -> Cow<'_, str>;
66
67 fn request_path(&self) -> &str;
69
70 fn get_query_string(&self) -> Option<&str>;
72
73 fn get_header(&self, name: &str) -> Option<&str>;
75
76 fn client_ip(&self) -> Option<IpAddr>;
78
79 fn get_context(&self) -> Option<Arc<Context>>;
81
82 fn get_data<T: Clone + Send + Sync + 'static>(&self) -> Option<T>;
84
85 fn set_data<T: Clone + Send + Sync + 'static>(&mut self, value: T) -> Option<T>;
88
89 async fn read_body_bytes(&mut self) -> Result<Bytes, Error>;
91
92 #[inline]
94 fn path_segments(&self) -> Vec<&str> {
95 self.request_path().trim_matches('/').split('/').collect()
96 }
97
98 fn new_context(&self) -> Context {
100 #[cfg(feature = "metrics")]
102 {
103 metrics::gauge!("zino_http_requests_in_flight").increment(1.0);
104 metrics::counter!(
105 "zino_http_requests_total",
106 "method" => self.request_method().as_ref().to_owned(),
107 "route" => self.matched_route().into_owned(),
108 )
109 .increment(1);
110 }
111
112 let request_id = self
114 .get_header("x-request-id")
115 .and_then(|s| s.parse().ok())
116 .unwrap_or_else(Uuid::now_v7);
117 let trace_id = self
118 .get_trace_context()
119 .map_or_else(Uuid::now_v7, |t| Uuid::from_u128(t.trace_id()));
120 let session_id = self
121 .get_header("x-session-id")
122 .or_else(|| self.get_header("session_id"))
123 .and_then(|s| s.parse().ok());
124
125 let mut ctx = Context::new(request_id);
127 ctx.set_instance(self.request_path().to_owned());
128 ctx.set_trace_id(trace_id);
129 ctx.set_session_id(session_id);
130
131 #[cfg(feature = "i18n")]
133 {
134 #[cfg(feature = "cookie")]
135 if let Some(cookie) = self.get_cookie("locale") {
136 if let Ok(locale) = cookie.value().parse() {
137 ctx.set_locale(locale);
138 return ctx;
139 }
140 }
141
142 if let Some(locale) = self
143 .get_header("accept-language")
144 .and_then(Intl::select_language)
145 {
146 ctx.set_locale(locale);
147 } else {
148 ctx.set_locale(Intl::default_locale().to_owned());
149 }
150 }
151 ctx
152 }
153
154 #[inline]
156 fn get_trace_context(&self) -> Option<TraceContext> {
157 let traceparent = self.get_header("traceparent")?;
158 let mut trace_context = TraceContext::from_traceparent(traceparent)?;
159 if let Some(tracestate) = self.get_header("tracestate") {
160 *trace_context.trace_state_mut() = TraceState::from_tracestate(tracestate);
161 }
162 Some(trace_context)
163 }
164
165 fn new_trace_context(&self) -> TraceContext {
167 let mut trace_context = self
168 .get_trace_context()
169 .or_else(|| {
170 self.get_context()
171 .map(|ctx| TraceContext::with_trace_id(ctx.trace_id()))
172 })
173 .map(|t| t.child())
174 .unwrap_or_default();
175 trace_context.record_trace_state();
176 trace_context
177 }
178
179 #[cfg(feature = "cookie")]
181 fn new_cookie(
182 &self,
183 name: SharedString,
184 value: SharedString,
185 max_age: Option<Duration>,
186 ) -> Cookie<'static> {
187 let mut cookie_builder = Cookie::build((name, value))
188 .http_only(true)
189 .secure(true)
190 .same_site(SameSite::Lax)
191 .path(self.request_path().to_owned());
192 if let Some(max_age) = max_age.and_then(|d| d.try_into().ok()) {
193 cookie_builder = cookie_builder.max_age(max_age);
194 }
195 cookie_builder.build()
196 }
197
198 #[cfg(feature = "cookie")]
200 fn get_cookie(&self, name: &str) -> Option<Cookie<'_>> {
201 self.get_header("cookie")?.split(';').find_map(|cookie| {
202 if let Some((key, value)) = cookie.split_once('=') {
203 (key == name).then(|| Cookie::new(key, value))
204 } else {
205 None
206 }
207 })
208 }
209
210 #[inline]
212 fn start_time(&self) -> Instant {
213 self.get_context()
214 .map(|ctx| ctx.start_time())
215 .unwrap_or_else(Instant::now)
216 }
217
218 #[inline]
220 fn instance(&self) -> String {
221 self.get_context()
222 .map(|ctx| ctx.instance().to_owned())
223 .unwrap_or_else(|| self.request_path().to_owned())
224 }
225
226 #[inline]
228 fn request_id(&self) -> Uuid {
229 self.get_context()
230 .map(|ctx| ctx.request_id())
231 .unwrap_or_default()
232 }
233
234 #[inline]
236 fn trace_id(&self) -> Uuid {
237 self.get_context()
238 .map(|ctx| ctx.trace_id())
239 .unwrap_or_default()
240 }
241
242 #[inline]
244 fn session_id(&self) -> Option<String> {
245 self.get_context()
246 .and_then(|ctx| ctx.session_id().map(|s| s.to_owned()))
247 }
248
249 #[cfg(feature = "i18n")]
251 #[inline]
252 fn locale(&self) -> Option<LanguageIdentifier> {
253 self.get_context().and_then(|ctx| ctx.locale().cloned())
254 }
255
256 fn data_type(&self) -> Option<&str> {
263 self.get_header("content-type")
264 .map(|content_type| {
265 if let Some((essence, _)) = content_type.split_once(';') {
266 essence
267 } else {
268 content_type
269 }
270 })
271 .map(helper::get_data_type)
272 }
273
274 fn get_param(&self, name: &str) -> Option<&str> {
283 const CAPTURES: [char; 4] = [':', '*', '{', '}'];
284 if let Some(index) = self
285 .matched_route()
286 .split('/')
287 .position(|segment| segment.trim_matches(CAPTURES.as_slice()) == name)
288 {
289 self.request_path().splitn(index + 2, '/').nth(index)
290 } else {
291 None
292 }
293 }
294
295 fn decode_param(&self, name: &str) -> Result<Cow<'_, str>, Rejection> {
297 if let Some(value) = self.get_param(name) {
298 percent_encoding::percent_decode_str(value)
299 .decode_utf8()
300 .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
301 } else {
302 Err(Rejection::from_validation_entry(
303 name.to_owned(),
304 warn!("param `{}` does not exist", name),
305 )
306 .context(self))
307 }
308 }
309
310 fn parse_param<T: FromStr<Err: Into<Error>>>(&self, name: &str) -> Result<T, Rejection> {
313 if let Some(param) = self.get_param(name) {
314 percent_encoding::percent_decode_str(param)
315 .decode_utf8_lossy()
316 .parse::<T>()
317 .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
318 } else {
319 Err(Rejection::from_validation_entry(
320 name.to_owned(),
321 warn!("param `{}` does not exist", name),
322 )
323 .context(self))
324 }
325 }
326
327 fn get_query(&self, name: &str) -> Option<&str> {
335 self.get_query_string()?.split('&').find_map(|param| {
336 if let Some((key, value)) = param.split_once('=') {
337 (key == name).then_some(value)
338 } else {
339 None
340 }
341 })
342 }
343
344 fn decode_query(&self, name: &str) -> Result<Cow<'_, str>, Rejection> {
346 if let Some(value) = self.get_query(name) {
347 percent_encoding::percent_decode_str(value)
348 .decode_utf8()
349 .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
350 } else {
351 Err(Rejection::from_validation_entry(
352 name.to_owned(),
353 warn!("query value `{}` does not exist", name),
354 )
355 .context(self))
356 }
357 }
358
359 fn parse_query<T: Default + DeserializeOwned>(&self) -> Result<T, Rejection> {
363 if let Some(query) = self.get_query_string() {
364 #[cfg(feature = "jwt")]
365 if let Some(timestamp) = self.get_query("timestamp").and_then(|s| s.parse().ok()) {
366 let duration = DateTime::from_timestamp(timestamp).span_between_now();
367 if duration > zino_auth::default_time_tolerance() {
368 let err = warn!("timestamp `{}` can not be trusted", timestamp);
369 let rejection = Rejection::from_validation_entry("timestamp", err);
370 return Err(rejection.context(self));
371 }
372 }
373 serde_qs::from_str::<T>(query)
374 .map_err(|err| Rejection::from_validation_entry("query", err).context(self))
375 } else {
376 Ok(T::default())
377 }
378 }
379
380 async fn parse_body<T: DeserializeOwned>(&mut self) -> Result<T, Rejection> {
390 let data_type = self.data_type().unwrap_or("form");
391 if data_type.contains('/') {
392 let err = warn!(
393 "deserialization of the data type `{}` is unsupported",
394 data_type
395 );
396 let rejection = Rejection::from_validation_entry("data_type", err).context(self);
397 return Err(rejection);
398 }
399
400 let is_form = data_type == "form";
401 let bytes = self
402 .read_body_bytes()
403 .await
404 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
405 if is_form {
406 serde_qs::from_bytes(&bytes)
407 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
408 } else {
409 serde_json::from_slice(&bytes)
410 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
411 }
412 }
413
414 async fn parse_multipart(&mut self) -> Result<Multipart<'_>, Rejection> {
416 let Some(content_type) = self.get_header("content-type") else {
417 return Err(Rejection::from_validation_entry(
418 "content_type",
419 warn!("invalid `content-type` header"),
420 )
421 .context(self));
422 };
423 match multer::parse_boundary(content_type) {
424 Ok(boundary) => {
425 let result = self.read_body_bytes().await.map_err(|err| err.to_string());
426 let stream = futures::stream::once(async { result });
427 Ok(Multipart::new(stream, boundary))
428 }
429 Err(err) => Err(Rejection::from_validation_entry("boundary", err).context(self)),
430 }
431 }
432
433 async fn parse_file(&mut self) -> Result<NamedFile, Rejection> {
435 let multipart = self.parse_multipart().await?;
436 NamedFile::try_from_multipart(multipart)
437 .await
438 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
439 }
440
441 async fn parse_files(&mut self) -> Result<Vec<NamedFile>, Rejection> {
443 let multipart = self.parse_multipart().await?;
444 NamedFile::try_collect_from_multipart(multipart)
445 .await
446 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
447 }
448
449 async fn parse_form<T: DeserializeOwned>(
451 &mut self,
452 name: &str,
453 ) -> Result<(Option<T>, Vec<NamedFile>), Rejection> {
454 let multipart = self.parse_multipart().await?;
455 helper::parse_form(multipart, name)
456 .await
457 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
458 }
459
460 async fn parse_form_data<T: DeserializeOwned>(
462 &mut self,
463 ) -> Result<(T, Vec<NamedFile>), Rejection> {
464 let multipart = self.parse_multipart().await?;
465 helper::parse_form_data(multipart)
466 .await
467 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
468 }
469
470 #[cfg(feature = "auth")]
475 fn parse_authentication(&self) -> Result<Authentication, Rejection> {
476 let method = self.request_method();
477 let query = self.parse_query::<Map>().unwrap_or_default();
478 let mut authentication = Authentication::new(method.as_ref());
479 let mut validation = Validation::new();
480 if let Some(signature) = query.get_str("signature") {
481 authentication.set_signature(signature.to_owned());
482 if let Some(access_key_id) = query.parse_string("access_key_id") {
483 authentication.set_access_key_id(access_key_id);
484 } else {
485 validation.record("access_key_id", "should be nonempty");
486 }
487 if let Some(Ok(secs)) = query.parse_i64("expires") {
488 if DateTime::now().timestamp() <= secs {
489 let expires = DateTime::from_timestamp(secs);
490 authentication.set_expires(Some(expires));
491 } else {
492 validation.record("expires", "valid period has expired");
493 }
494 } else {
495 validation.record("expires", "invalid timestamp");
496 }
497 if !validation.is_success() {
498 return Err(Rejection::bad_request(validation).context(self));
499 }
500 } else if let Some(authorization) = self.get_header("authorization") {
501 if let Some((service_name, token)) = authorization.split_once(' ') {
502 authentication.set_service_name(service_name);
503 if let Some((access_key_id, signature)) = token.split_once(':') {
504 authentication.set_access_key_id(access_key_id);
505 authentication.set_signature(signature.to_owned());
506 } else {
507 validation.record("authorization", "invalid header value");
508 }
509 } else {
510 validation.record("authorization", "invalid service name");
511 }
512 if !validation.is_success() {
513 return Err(Rejection::bad_request(validation).context(self));
514 }
515 }
516 if let Some(content_md5) = self.get_header("content-md5") {
517 authentication.set_content_md5(content_md5.to_owned());
518 }
519 if let Some(date) = self.get_header("date") {
520 match DateTime::parse_utc_str(date) {
521 Ok(date) => {
522 #[cfg(feature = "jwt")]
523 if date.span_between_now() <= zino_auth::default_time_tolerance() {
524 authentication.set_date_header("date", date);
525 } else {
526 validation.record("date", "untrusted date");
527 }
528 #[cfg(not(feature = "jwt"))]
529 authentication.set_date_header("date", date);
530 }
531 Err(err) => {
532 validation.record_fail("date", err);
533 return Err(Rejection::bad_request(validation).context(self));
534 }
535 }
536 }
537 authentication.set_content_type(self.get_header("content-type").map(|s| s.to_owned()));
538 authentication.set_resource(self.request_path().to_owned(), None);
539 Ok(authentication)
540 }
541
542 #[cfg(feature = "auth")]
546 fn parse_access_key_id(&self) -> Result<AccessKeyId, Rejection> {
547 if let Some(access_key_id) = self.get_query("access_key_id") {
548 Ok(access_key_id.into())
549 } else {
550 let mut validation = Validation::new();
551 if let Some(authorization) = self.get_header("authorization") {
552 if let Some((_, token)) = authorization.split_once(' ') {
553 let access_key_id = if let Some((access_key_id, _)) = token.split_once(':') {
554 access_key_id
555 } else {
556 token
557 };
558 return Ok(access_key_id.into());
559 } else {
560 validation.record("authorization", "invalid service name");
561 }
562 } else {
563 validation.record("authorization", "invalid value to get the access key id");
564 }
565 Err(Rejection::bad_request(validation).context(self))
566 }
567 }
568
569 #[cfg(feature = "auth")]
572 fn parse_security_token(&self, key: &[u8]) -> Result<SecurityToken, Rejection> {
573 use ParseSecurityTokenError::*;
574 let query = self.parse_query::<Map>()?;
575 let mut validation = Validation::new();
576 if let Some(token) = self
577 .get_header("x-security-token")
578 .or_else(|| query.get_str("security_token"))
579 {
580 match SecurityToken::parse_with(token.to_owned(), key) {
581 Ok(security_token) => {
582 if let Some(access_key_id) = query.get_str("access_key_id") {
583 if security_token.access_key_id().as_str() != access_key_id {
584 validation.record("access_key_id", "untrusted access key ID");
585 }
586 }
587 if let Some(Ok(expires)) = query.parse_i64("expires") {
588 if security_token.expires_at().timestamp() != expires {
589 validation.record("expires", "untrusted timestamp");
590 }
591 }
592 if validation.is_success() {
593 return Ok(security_token);
594 }
595 }
596 Err(err) => {
597 let field = match err {
598 DecodeError(_) | InvalidFormat => "security_token",
599 ParseExpiresError(_) | ValidPeriodExpired(_) => "expires",
600 };
601 validation.record_fail(field, err);
602 }
603 }
604 } else {
605 validation.record("security_token", "should be nonempty");
606 }
607 Err(Rejection::bad_request(validation).context(self))
608 }
609
610 #[cfg(feature = "auth")]
613 fn parse_session_id(&self) -> Result<SessionId, Rejection> {
614 self.get_header("x-session-id")
615 .or_else(|| self.get_header("session-id"))
616 .ok_or_else(|| {
617 Rejection::from_validation_entry(
618 "session_id",
619 warn!("a `session-id` or `x-session-id` header is required"),
620 )
621 .context(self)
622 })
623 .and_then(|session_id| {
624 SessionId::parse(session_id).map_err(|err| {
625 Rejection::from_validation_entry("session_id", err).context(self)
626 })
627 })
628 }
629
630 #[cfg(feature = "jwt")]
634 fn parse_jwt_claims<T, K>(&self, key: &K) -> Result<JwtClaims<T>, Rejection>
635 where
636 T: Default + serde::Serialize + DeserializeOwned,
637 K: MACLike,
638 {
639 let (param, mut token) = match self.get_query("access_token") {
640 Some(access_token) => ("access_token", access_token),
641 None => ("authorization", ""),
642 };
643 if let Some(authorization) = self.get_header("authorization") {
644 token = authorization
645 .strip_prefix("Bearer ")
646 .unwrap_or(authorization);
647 } else if cfg!(feature = "cookie") {
648 let value = self.get_header("cookie").and_then(|s| {
649 s.split(';').find_map(|cookie| {
650 if let Some((key, value)) = cookie.split_once('=') {
651 (key == "access_token").then_some(value)
652 } else {
653 None
654 }
655 })
656 });
657 if let Some(access_token) = value {
658 token = access_token;
659 }
660 }
661 if token.is_empty() {
662 let mut validation = Validation::new();
663 validation.record(param, "JWT should be nonempty");
664 return Err(Rejection::bad_request(validation).context(self));
665 }
666
667 let mut options = zino_auth::default_verification_options();
668 options.reject_before = self
669 .get_query("timestamp")
670 .and_then(|s| s.parse().ok())
671 .map(|i| Duration::from_secs(i).into());
672 options.required_nonce = self.get_query("nonce").map(|s| s.to_owned());
673
674 match key.verify_token(token, Some(options)) {
675 Ok(claims) => Ok(claims.into()),
676 Err(err) => {
677 let message = format!("401 Unauthorized: {err}");
678 Err(Rejection::with_message(message).context(self))
679 }
680 }
681 }
682
683 fn query_validation<S>(&self, query: &mut Query) -> Result<Response<S>, Rejection>
686 where
687 Self: Sized,
688 S: ResponseCode,
689 {
690 match self.parse_query() {
691 Ok(data) => {
692 let validation = query.read_map(&data);
693 if validation.is_success() {
694 Ok(Response::with_context(S::OK, self))
695 } else {
696 Err(Rejection::bad_request(validation).context(self))
697 }
698 }
699 Err(rejection) => Err(rejection),
700 }
701 }
702
703 async fn model_validation<M, S>(&mut self, model: &mut M) -> Result<Response<S>, Rejection>
706 where
707 Self: Sized,
708 M: ModelHooks,
709 S: ResponseCode,
710 {
711 let data_type = self.data_type().unwrap_or("form");
712 if data_type.contains('/') {
713 let err = warn!(
714 "deserialization of the data type `{}` is unsupported",
715 data_type
716 );
717 let rejection = Rejection::from_validation_entry("data_type", err).context(self);
718 return Err(rejection);
719 }
720 M::before_extract()
721 .await
722 .map_err(|err| Rejection::from_error(err).context(self))?;
723
724 let is_form = data_type == "form";
725 let bytes = self
726 .read_body_bytes()
727 .await
728 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
729 let extension = self.get_data::<M::Extension>();
730 if is_form {
731 let mut data = serde_qs::from_bytes(&bytes)
732 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
733 match M::before_validation(&mut data, extension.as_ref()).await {
734 Ok(()) => {
735 let validation = model.read_map(&data);
736 model
737 .after_validation(&mut data)
738 .await
739 .map_err(|err| Rejection::from_error(err).context(self))?;
740 if let Some(extension) = extension {
741 model
742 .after_extract(extension)
743 .await
744 .map_err(|err| Rejection::from_error(err).context(self))?;
745 }
746 if validation.is_success() {
747 Ok(Response::with_context(S::OK, self))
748 } else {
749 Err(Rejection::bad_request(validation).context(self))
750 }
751 }
752 Err(err) => Err(Rejection::from_error(err).context(self)),
753 }
754 } else {
755 let mut data = serde_json::from_slice(&bytes)
756 .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
757 match M::before_validation(&mut data, extension.as_ref()).await {
758 Ok(()) => {
759 let validation = model.read_map(&data);
760 model
761 .after_validation(&mut data)
762 .await
763 .map_err(|err| Rejection::from_error(err).context(self))?;
764 if let Some(extension) = extension {
765 model
766 .after_extract(extension)
767 .await
768 .map_err(|err| Rejection::from_error(err).context(self))?;
769 }
770 if validation.is_success() {
771 Ok(Response::with_context(S::OK, self))
772 } else {
773 Err(Rejection::bad_request(validation).context(self))
774 }
775 }
776 Err(err) => Err(Rejection::from_error(err).context(self)),
777 }
778 }
779 }
780
781 async fn fetch(&self, url: &str, options: Option<&Map>) -> Result<reqwest::Response, Error> {
783 let trace_context = self.new_trace_context();
784 Agent::request_builder(url, options)?
785 .header("traceparent", trace_context.traceparent())
786 .header("tracestate", trace_context.tracestate())
787 .send()
788 .await
789 .map_err(Error::from)
790 }
791
792 async fn fetch_json<T: DeserializeOwned>(
795 &self,
796 url: &str,
797 options: Option<&Map>,
798 ) -> Result<T, Error> {
799 let response = self.fetch(url, options).await?.error_for_status()?;
800 let data = if response.headers().has_json_content_type() {
801 response.json().await?
802 } else {
803 let text = response.text().await?;
804 serde_json::from_str(&text)?
805 };
806 Ok(data)
807 }
808
809 #[cfg(feature = "i18n")]
811 fn translate(
812 &self,
813 message: &str,
814 args: Option<FluentArgs>,
815 ) -> Result<SharedString, IntlError> {
816 if let Some(locale) = self.locale() {
817 Intl::translate_with(message, args, &locale)
818 } else {
819 Intl::translate(message, args)
820 }
821 }
822
823 fn subscription(&self) -> Subscription {
825 let mut subscription = self.parse_query::<Subscription>().unwrap_or_default();
826 if subscription.session_id().is_none()
827 && let Some(session_id) = self.session_id()
828 {
829 subscription.set_session_id(Some(session_id));
830 }
831 subscription
832 }
833
834 fn cloud_event(&self, event_type: SharedString, data: JsonValue) -> CloudEvent {
836 let id = self.request_id();
837 let source = self.instance();
838 let mut event = CloudEvent::new(id, source, event_type);
839 if let Some(session_id) = self.session_id() {
840 event.set_session_id(session_id);
841 }
842 event.set_data(data);
843 event
844 }
845}