1use std::borrow::Cow;
4use std::sync::Arc;
5
6use bytes::Bytes;
7use chrono::DateTime;
8use chrono::Utc;
9use http_cache_stream_reqwest::Cache;
10use http_cache_stream_reqwest::storage::DefaultCacheStorage;
11use reqwest::Body;
12use reqwest::Request;
13use reqwest::Response;
14use reqwest::StatusCode;
15use reqwest::header;
16use reqwest::header::HeaderValue;
17use secrecy::ExposeSecret;
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::sync::broadcast;
21use tracing::debug;
22use url::Url;
23
24use crate::BLOCK_SIZE_THRESHOLD;
25use crate::Config;
26use crate::Error;
27use crate::HttpClient;
28use crate::ONE_MEBIBYTE;
29use crate::Result;
30use crate::S3AuthConfig;
31use crate::TransferEvent;
32use crate::USER_AGENT;
33use crate::UrlExt as _;
34use crate::backend::StorageBackend;
35use crate::backend::Upload;
36use crate::backend::auth::RequestSigner;
37use crate::backend::auth::SignatureProvider;
38use crate::backend::auth::sha256_hex_string;
39use crate::streams::ByteStream;
40use crate::streams::TransferStream;
41
42const AWS_ROOT_DOMAIN: &str = "amazonaws.com";
44
45const LOCALSTACK_ROOT_DOMAIN: &str = "localhost.localstack.cloud";
47
48const DEFAULT_REGION: &str = "us-east-1";
50
51const MAX_PARTS: u64 = 10000;
53
54const MIN_PART_SIZE: u64 = 5 * ONE_MEBIBYTE;
57
58const MAX_PART_SIZE: u64 = MIN_PART_SIZE * 1024;
60
61const MAX_FILE_SIZE: u64 = MAX_PART_SIZE * 1024;
63
64const AWS_DATE_HEADER: &str = "x-amz-date";
66
67const AWS_CONTENT_SHA256_HEADER: &str = "x-amz-content-sha256";
69
70#[derive(Debug, thiserror::Error)]
72pub enum S3Error {
73 #[error("S3 block size cannot exceed {MAX_PART_SIZE} bytes")]
75 InvalidBlockSize,
76 #[error("the size of the source file exceeds the supported maximum of {MAX_FILE_SIZE} bytes")]
78 MaximumSizeExceeded,
79 #[error("invalid URL with `s3` scheme: the URL is not in a supported format")]
81 InvalidScheme,
82 #[error("URL is missing the bucket in the path")]
84 MissingBucket,
85 #[error("invalid S3 secret access key")]
87 InvalidSecretAccessKey,
88 #[error("response from server was missing an ETag header")]
90 ResponseMissingETag,
91 #[error("the bucket name specified in the URL is invalid")]
93 InvalidBucketName,
94 #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
96 UnexpectedResponse {
97 status: reqwest::StatusCode,
99 error: serde_xml_rs::Error,
101 },
102}
103
104#[derive(Debug, Deserialize)]
106pub struct Content {
107 #[serde(rename = "Key")]
109 pub key: String,
110}
111
112#[derive(Debug, Deserialize)]
114#[serde(rename = "ListBucketResult")]
115pub struct ListBucketResult {
116 #[serde(default, rename = "Contents")]
118 pub contents: Vec<Content>,
119 #[serde(rename = "NextContinuationToken", default)]
121 pub token: Option<String>,
122}
123
124#[derive(Default, Deserialize)]
126#[serde(rename = "InitiateMultipartUploadResult")]
127pub struct InitiateMultipartUploadResult {
128 #[serde(rename = "UploadId")]
130 pub upload_id: String,
131}
132
133pub struct S3SignatureProvider<'a> {
135 region: &'a str,
137 auth: &'a S3AuthConfig,
139}
140
141impl SignatureProvider for S3SignatureProvider<'_> {
142 fn algorithm(&self) -> &str {
143 "AWS4-HMAC-SHA256"
144 }
145
146 fn secret_key_prefix(&self) -> &str {
147 "AWS4"
148 }
149
150 fn request_type(&self) -> &str {
151 "aws4_request"
152 }
153
154 fn region(&self) -> &str {
155 self.region
156 }
157
158 fn service(&self) -> &str {
159 "s3"
160 }
161
162 fn date_header_name(&self) -> &str {
163 AWS_DATE_HEADER
164 }
165
166 fn content_hash_header_name(&self) -> &str {
167 AWS_CONTENT_SHA256_HEADER
168 }
169
170 fn access_key_id(&self) -> &str {
171 &self.auth.access_key_id
172 }
173
174 fn secret_access_key(&self) -> &str {
175 self.auth.secret_access_key.expose_secret()
176 }
177}
178
179fn append_authentication_header(
181 auth: &S3AuthConfig,
182 date: DateTime<Utc>,
183 request: &mut Request,
184) -> Result<()> {
185 let signer = RequestSigner::new(S3SignatureProvider {
186 region: request.url().region(),
187 auth,
188 });
189 let auth = signer
190 .sign(date, request)
191 .ok_or(S3Error::InvalidSecretAccessKey)?;
192 request.headers_mut().append(
193 header::AUTHORIZATION,
194 HeaderValue::try_from(auth).expect("value should be valid"),
195 );
196 Ok(())
197}
198
199trait UrlExt {
201 fn region(&self) -> &str;
207
208 fn bucket_and_path(&self) -> (&str, &str);
214}
215
216impl UrlExt for Url {
217 fn region(&self) -> &str {
218 let domain = self.domain().expect("URL should have domain");
219
220 if domain.starts_with("s3.") || domain.starts_with("S3.") {
221 let mut parts = domain.splitn(3, '.');
223 match (parts.next(), parts.next()) {
224 (_, Some(region)) => region,
225 _ => panic!("invalid S3 URL"),
226 }
227 } else {
228 let mut parts = domain.splitn(4, '.');
230
231 match (parts.next(), parts.next(), parts.next()) {
232 (_, _, Some(region)) => region,
233 _ => panic!("invalid S3 URL"),
234 }
235 }
236 }
237
238 fn bucket_and_path(&self) -> (&str, &str) {
239 let domain = self.domain().expect("URL should have domain");
240
241 if domain.starts_with("s3.") || domain.starts_with("S3.") {
242 let bucket = self
244 .path_segments()
245 .expect("URL should have path")
246 .next()
247 .expect("URL should have at least one path segment");
248
249 (
250 bucket,
251 self.path()
252 .strip_prefix('/')
253 .unwrap()
254 .strip_prefix(bucket)
255 .unwrap(),
256 )
257 } else {
258 let Some((bucket, _)) = domain.split_once('.') else {
260 panic!("URL domain does not contain a bucket");
261 };
262
263 (bucket, self.path())
264 }
265 }
266}
267
268trait ResponseExt {
270 async fn into_error(self) -> Error;
272}
273
274impl ResponseExt for Response {
275 async fn into_error(self) -> Error {
276 #[derive(Default, Deserialize)]
278 #[serde(rename = "Error")]
279 struct ErrorResponse {
280 #[serde(rename = "Message")]
282 message: String,
283 }
284
285 let status = self.status();
286
287 if status == StatusCode::MOVED_PERMANENTLY {
289 return Error::Server {
290 status,
291 message: "the AWS region being used may not be the correct region for the storage \
292 bucket"
293 .into(),
294 };
295 }
296
297 let text: String = match self.text().await {
298 Ok(text) => text,
299 Err(e) => return e.into(),
300 };
301
302 if text.is_empty() {
303 return Error::Server {
304 status,
305 message: text,
306 };
307 }
308
309 let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
310 Ok(response) => response.message,
311 Err(e) => {
312 return S3Error::UnexpectedResponse { status, error: e }.into();
313 }
314 };
315
316 Error::Server { status, message }
317 }
318}
319
320#[derive(Default, Clone, Serialize)]
322#[serde(rename = "Part")]
323pub struct S3UploadPart {
324 #[serde(rename = "PartNumber")]
326 number: u64,
327 #[serde(rename = "ETag")]
329 etag: String,
330}
331
332pub struct S3Upload {
334 config: Arc<Config>,
336 client: HttpClient,
338 url: Url,
340 id: String,
342 events: Option<broadcast::Sender<TransferEvent>>,
344}
345
346impl Upload for S3Upload {
347 type Part = S3UploadPart;
348
349 async fn put(&self, id: u64, block: u64, bytes: Bytes) -> Result<Option<Self::Part>> {
350 debug!(
353 "sending PUT request for block {block} of `{url}`",
354 url = self.url.display()
355 );
356
357 let mut url = self.url.clone();
358
359 {
360 let mut pairs = url.query_pairs_mut();
361 pairs.append_pair("partNumber", &format!("{number}", number = block + 1));
362 pairs.append_pair("uploadId", &self.id);
363 }
364
365 let digest = sha256_hex_string(&bytes);
366 let length = bytes.len();
367 let body = Body::wrap_stream(TransferStream::new(
368 ByteStream::new(bytes),
369 id,
370 block,
371 0,
372 self.events.clone(),
373 ));
374
375 let date = Utc::now();
376 let mut request = self
377 .client
378 .put(url)
379 .header(header::USER_AGENT, USER_AGENT)
380 .header(header::CONTENT_LENGTH, length)
381 .header(header::CONTENT_TYPE, "application/octet-stream")
382 .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
383 .header(AWS_CONTENT_SHA256_HEADER, &digest)
384 .body(body)
385 .build()?;
386
387 if let Some(auth) = &self.config.s3.auth {
388 append_authentication_header(auth, date, &mut request)?;
389 }
390
391 let response = self.client.execute(request).await?;
392 if !response.status().is_success() {
393 return Err(response.into_error().await);
394 }
395
396 let etag = response
397 .headers()
398 .get(header::ETAG)
399 .and_then(|v| v.to_str().ok())
400 .ok_or(S3Error::ResponseMissingETag)?;
401
402 Ok(Some(S3UploadPart {
403 number: block + 1,
404 etag: etag.to_string(),
405 }))
406 }
407
408 async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
409 #[derive(Serialize)]
413 #[serde(rename = "CompleteMultipartUpload")]
414 struct CompleteUpload<'a> {
415 #[serde(rename = "Part")]
417 parts: &'a [S3UploadPart],
418 }
419
420 debug!(
421 "sending POST request to finalize upload of `{url}`",
422 url = self.url.display()
423 );
424
425 let mut url = self.url.clone();
426
427 {
428 let mut pairs = url.query_pairs_mut();
429 pairs.append_pair("uploadId", &self.id);
430 }
431
432 let body = serde_xml_rs::SerdeXml::new()
433 .default_namespace("http://s3.amazonaws.com/doc/2006-03-01/")
434 .to_string(&CompleteUpload { parts })
435 .expect("should serialize");
436
437 let date = Utc::now();
438 let mut request = self
439 .client
440 .post(url)
441 .header(header::USER_AGENT, USER_AGENT)
442 .header(header::CONTENT_LENGTH, body.len())
443 .header(header::CONTENT_TYPE, "application/xml")
444 .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
445 .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string(&body))
446 .body(body)
447 .build()?;
448
449 if let Some(auth) = &self.config.s3.auth {
450 append_authentication_header(auth, date, &mut request)?;
451 }
452
453 let response = self.client.execute(request).await?;
454 if !response.status().is_success() {
455 return Err(response.into_error().await);
456 }
457
458 Ok(())
459 }
460}
461
462pub struct S3StorageBackend {
464 config: Arc<Config>,
466 client: HttpClient,
468 events: Option<broadcast::Sender<TransferEvent>>,
470}
471
472impl S3StorageBackend {
473 pub fn new(
475 config: Config,
476 client: HttpClient,
477 events: Option<broadcast::Sender<TransferEvent>>,
478 ) -> Self {
479 Self {
480 config: Arc::new(config),
481 client,
482 events,
483 }
484 }
485}
486
487impl StorageBackend for S3StorageBackend {
488 type Upload = S3Upload;
489
490 fn config(&self) -> &Config {
491 &self.config
492 }
493
494 fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
495 self.client.cache()
496 }
497
498 fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
499 &self.events
500 }
501
502 fn block_size(&self, file_size: u64) -> Result<u64> {
503 const BLOCK_COUNT_INCREMENT: u64 = 50;
505
506 if let Some(size) = self.config.block_size {
508 if size > MAX_PART_SIZE {
509 return Err(S3Error::InvalidBlockSize.into());
510 }
511
512 return Ok(size);
513 }
514
515 let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
517 while num_blocks < MAX_PARTS {
518 let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
519 if block_size <= BLOCK_SIZE_THRESHOLD {
520 return Ok(block_size.max(MIN_PART_SIZE));
521 }
522
523 num_blocks += BLOCK_COUNT_INCREMENT;
524 }
525
526 let block_size: u64 = file_size.div_ceil(MAX_PARTS);
529 if block_size > MAX_PART_SIZE {
530 return Err(S3Error::MaximumSizeExceeded.into());
531 }
532
533 Ok(block_size)
534 }
535
536 fn is_supported_url(config: &Config, url: &Url) -> bool {
537 match url.scheme() {
538 "s3" => true,
539 "http" | "https" => {
540 let Some(domain) = url.domain() else {
541 return false;
542 };
543
544 if domain.starts_with("s3.") || domain.starts_with("S3.") {
545 let domain = &domain[3..];
547 let Some((region, domain)) = domain.split_once('.') else {
548 return false;
549 };
550
551 !region.is_empty()
553 && (domain.eq_ignore_ascii_case(AWS_ROOT_DOMAIN)
554 || (config.s3.use_localstack
555 && domain.eq_ignore_ascii_case(LOCALSTACK_ROOT_DOMAIN)))
556 && url
557 .path_segments()
558 .map(|mut s| s.nth(1).is_some())
559 .unwrap_or(false)
560 } else {
561 let mut parts = domain.splitn(4, '.');
563 match (parts.next(), parts.next(), parts.next(), parts.next()) {
564 (Some(bucket), Some(service), Some(region), Some(domain)) => {
565 !bucket.is_empty()
567 && !region.is_empty()
568 && service.eq_ignore_ascii_case("s3")
569 && (domain.eq_ignore_ascii_case(AWS_ROOT_DOMAIN)
570 || (config.s3.use_localstack
571 && domain.eq_ignore_ascii_case(LOCALSTACK_ROOT_DOMAIN)))
572 && url
573 .path_segments()
574 .map(|mut s| s.next().is_some())
575 .unwrap_or(false)
576 }
577 _ => false,
578 }
579 }
580 }
581 _ => false,
582 }
583 }
584
585 fn rewrite_url<'a>(config: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
586 match url.scheme() {
587 "s3" => {
588 let region = config.s3.region.as_deref().unwrap_or(DEFAULT_REGION);
589 let bucket = url.host_str().ok_or(S3Error::InvalidScheme)?;
590 let path = url.path();
591
592 if url.path() == "/" {
593 return Err(S3Error::InvalidScheme.into());
594 }
595
596 let (scheme, root, port) = if config.s3.use_localstack {
597 ("http", LOCALSTACK_ROOT_DOMAIN, ":4566")
598 } else {
599 ("https", AWS_ROOT_DOMAIN, "")
600 };
601
602 match (url.query(), url.fragment()) {
603 (None, None) => format!("{scheme}://{bucket}.s3.{region}.{root}{port}{path}"),
604 (None, Some(fragment)) => {
605 format!("{scheme}://{bucket}.s3.{region}.{root}{port}{path}#{fragment}")
606 }
607 (Some(query), None) => {
608 format!("{scheme}://{bucket}.s3.{region}.{root}{port}{path}?{query}")
609 }
610 (Some(query), Some(fragment)) => {
611 format!(
612 "{scheme}://{bucket}.s3.{region}.{root}{port}{path}?{query}#{fragment}"
613 )
614 }
615 }
616 .parse()
617 .map(Cow::Owned)
618 .map_err(|_| S3Error::InvalidScheme.into())
619 }
620 _ => Ok(Cow::Borrowed(url)),
621 }
622 }
623
624 fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
625 {
627 let mut existing = url.path_segments_mut().expect("url should have path");
628 existing.pop_if_empty();
629 existing.extend(segments);
630 }
631
632 Ok(url)
633 }
634
635 async fn head(&self, url: Url, must_exist: bool) -> Result<Response> {
636 debug_assert!(
637 Self::is_supported_url(&self.config, &url),
638 "{url} is not a supported S3 URL",
639 url = url.as_str()
640 );
641
642 debug!("sending HEAD request for `{url}`", url = url.display());
643
644 let date = Utc::now();
645 let mut request = self
646 .client
647 .head(url)
648 .header(header::USER_AGENT, USER_AGENT)
649 .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
650 .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
651 .build()?;
652
653 if let Some(auth) = &self.config.s3.auth {
654 append_authentication_header(auth, date, &mut request)?;
655 }
656
657 let response = self.client.execute(request).await?;
658 if !response.status().is_success() {
659 if !must_exist && response.status() == StatusCode::NOT_FOUND {
661 return Ok(response);
662 }
663
664 return Err(response.into_error().await);
665 }
666
667 Ok(response)
668 }
669
670 async fn get(&self, url: Url) -> Result<Response> {
671 debug_assert!(
672 Self::is_supported_url(&self.config, &url),
673 "{url} is not a supported S3 URL",
674 url = url.as_str()
675 );
676
677 debug!("sending GET request for `{url}`", url = url.display());
678
679 let date = Utc::now();
680 let mut request = self
681 .client
682 .get(url)
683 .header(header::USER_AGENT, USER_AGENT)
684 .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
685 .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
686 .build()?;
687
688 if let Some(auth) = &self.config.s3.auth {
689 append_authentication_header(auth, date, &mut request)?;
690 }
691
692 let response = self.client.execute(request).await?;
693 if !response.status().is_success() {
694 return Err(response.into_error().await);
695 }
696
697 Ok(response)
698 }
699
700 async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
701 debug_assert!(
702 Self::is_supported_url(&self.config, &url),
703 "{url} is not a supported S3 URL",
704 url = url.as_str()
705 );
706
707 debug!(
708 "sending GET request at offset {offset} for `{url}`",
709 url = url.display(),
710 );
711
712 let date = Utc::now();
713
714 let mut request = self
715 .client
716 .get(url)
717 .header(header::USER_AGENT, USER_AGENT)
718 .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
719 .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
720 .header(header::RANGE, format!("bytes={offset}-"))
721 .header(header::IF_MATCH, etag)
722 .build()?;
723
724 if let Some(auth) = &self.config.s3.auth {
725 append_authentication_header(auth, date, &mut request)?;
726 }
727
728 let response = self.client.execute(request).await?;
729 let status = response.status();
730
731 if status == StatusCode::PRECONDITION_FAILED {
733 return Err(Error::RemoteContentModified);
734 }
735
736 if !status.is_success() {
738 return Err(response.into_error().await);
739 }
740
741 if status != StatusCode::PARTIAL_CONTENT {
743 return Err(Error::RemoteContentModified);
744 }
745
746 Ok(response)
747 }
748
749 async fn walk(&self, mut url: Url) -> Result<Vec<String>> {
750 debug_assert!(
753 Self::is_supported_url(&self.config, &url),
754 "{url} is not a supported S3 URL",
755 url = url.as_str()
756 );
757
758 debug!("walking `{url}` as a directory", url = url.display());
759
760 let (bucket, path) = url.bucket_and_path();
761
762 let mut prefix = path.strip_prefix('/').unwrap_or(path).to_string();
764 prefix.push('/');
765
766 let domain = url.domain().expect("URL should have domain");
768 if domain.starts_with("s3") || domain.starts_with("S3") {
769 url.set_host(Some(&format!("{bucket}.{domain}")))
771 .map_err(|_| S3Error::InvalidBucketName)?;
772 }
773
774 url.set_path("/");
775
776 {
777 let mut pairs = url.query_pairs_mut();
778 pairs.append_pair("list-type", "2");
780 pairs.append_pair("prefix", &prefix);
782 }
783
784 let date = Utc::now();
785 let mut token = String::new();
786 let mut paths = Vec::new();
787 loop {
788 let mut url = url.clone();
789 if !token.is_empty() {
790 url.query_pairs_mut()
791 .append_pair("continuation-token", &token);
792 }
793
794 let mut request = self
796 .client
797 .get(url)
798 .header(header::USER_AGENT, USER_AGENT)
799 .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
800 .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
801 .build()?;
802
803 if let Some(auth) = &self.config.s3.auth {
804 append_authentication_header(auth, date, &mut request)?;
805 }
806
807 let response = self.client.execute(request).await?;
808
809 let status = response.status();
810 if !status.is_success() {
811 return Err(response.into_error().await);
812 }
813
814 let text = response.text().await?;
815 let results: ListBucketResult = match serde_xml_rs::from_str(&text) {
816 Ok(response) => response,
817 Err(e) => {
818 return Err(S3Error::UnexpectedResponse { status, error: e }.into());
819 }
820 };
821
822 if paths.is_empty()
825 && results.contents.len() == 1
826 && results.token.is_none()
827 && let Some("") = results.contents[0].key.strip_prefix(&prefix)
828 {
829 return Ok(paths);
830 }
831
832 paths.extend(
833 results
834 .contents
835 .into_iter()
836 .map(|c| c.key.strip_prefix(&prefix).map(Into::into).unwrap_or(c.key)),
837 );
838
839 token = results.token.unwrap_or_default();
840 if token.is_empty() {
841 break;
842 }
843 }
844
845 Ok(paths)
846 }
847
848 async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
849 debug_assert!(
852 Self::is_supported_url(&self.config, &url),
853 "{url} is not a supported S3 URL",
854 url = url.as_str()
855 );
856
857 if !self.config.overwrite {
861 let response = self.head(url.clone(), false).await?;
862 if response.status() != StatusCode::NOT_FOUND {
863 return Err(Error::RemoteDestinationExists(url));
864 }
865 }
866
867 debug!("sending POST request for `{url}`", url = url.display());
868
869 let mut create = url.clone();
870 create.query_pairs_mut().append_key_only("uploads");
871
872 let date = Utc::now();
873 let mut request = self
874 .client
875 .post(create)
876 .header(header::USER_AGENT, USER_AGENT)
877 .header(AWS_DATE_HEADER, date.format("%Y%m%dT%H%M%SZ").to_string())
878 .header(AWS_CONTENT_SHA256_HEADER, sha256_hex_string([]))
879 .build()?;
880
881 if let Some(auth) = &self.config.s3.auth {
882 append_authentication_header(auth, date, &mut request)?;
883 }
884
885 let response = self.client.execute(request).await?;
886 let status = response.status();
887 if !status.is_success() {
888 return Err(response.into_error().await);
889 }
890
891 let text: String = match response.text().await {
892 Ok(text) => text,
893 Err(e) => return Err(e.into()),
894 };
895
896 let id = match serde_xml_rs::from_str::<InitiateMultipartUploadResult>(&text) {
897 Ok(response) => response.upload_id,
898 Err(e) => {
899 return Err(S3Error::UnexpectedResponse { status, error: e }.into());
900 }
901 };
902
903 Ok(S3Upload {
904 config: self.config.clone(),
905 client: self.client.clone(),
906 url,
907 id,
908 events: self.events.clone(),
909 })
910 }
911}