Skip to main content

opcua_client/session/services/
session.rs

1use std::{sync::Arc, time::Duration};
2
3use opcua_core::{
4    comms::url::hostname_from_url, sync::RwLock, trace_read_lock, trace_write_lock, ResponseMessage,
5};
6use opcua_crypto::{
7    self, legacy_encrypt_secret, random, CertificateStore, PKey, SecurityPolicy, X509,
8};
9use opcua_types::{
10    ActivateSessionRequest, ActivateSessionResponse, AnonymousIdentityToken,
11    ApplicationDescription, ByteString, CancelRequest, CancelResponse, CloseSessionRequest,
12    CloseSessionResponse, CreateSessionRequest, CreateSessionResponse, EndpointDescription, Error,
13    ExtensionObject, IntegerId, IssuedIdentityToken, MessageSecurityMode, NodeId, SignatureData,
14    SignedSoftwareCertificate, StatusCode, UAString, UserNameIdentityToken, UserTokenType,
15    X509IdentityToken,
16};
17use rsa::RsaPrivateKey;
18use tracing::error;
19
20use crate::{
21    session::{
22        process_service_result, process_unexpected_response,
23        request_builder::{builder_base, builder_error, RequestHeaderBuilder},
24    },
25    AsyncSecureChannel, IdentityToken, Session, UARequest,
26};
27
28#[derive(Clone)]
29/// Sends a [`CreateSessionRequest`] to the server, returning the session id of the created
30/// session. Internally, the session will store the authentication token which is used for requests
31/// subsequent to this call.
32///
33/// See OPC UA Part 4 - Services 5.6.2 for complete description of the service and error responses.
34///
35/// Note that in order to use the session you will need to store the auth token and
36/// use that in subsequent requests.
37///
38/// Note: Avoid calling this on sessions managed by the [`Session`] type. Session creation
39/// is handled automatically as part of connect/reconnect logic.
40pub struct CreateSession<'a> {
41    client_description: ApplicationDescription,
42    server_uri: UAString,
43    endpoint_url: UAString,
44    session_name: UAString,
45    client_certificate: Option<X509>,
46    session_timeout: f64,
47    max_response_message_size: u32,
48    certificate_store: &'a RwLock<CertificateStore>,
49    endpoint: &'a EndpointDescription,
50    nonce_length: usize,
51
52    header: RequestHeaderBuilder,
53}
54
55builder_base!(CreateSession<'a>);
56
57impl<'a> CreateSession<'a> {
58    /// Create a new `CreateSession` request on the given session.
59    ///
60    /// Crate private since there is no way to safely use this.
61    pub(crate) fn new(session: &'a Session) -> Self {
62        Self {
63            endpoint_url: session.endpoint_info().endpoint.endpoint_url.clone(),
64            server_uri: UAString::null(),
65            client_description: session.application_description.clone(),
66            session_name: session.session_name.clone(),
67            client_certificate: session.channel.read_own_certificate(),
68            endpoint: &session.endpoint_info().endpoint,
69            certificate_store: session.channel.certificate_store(),
70            session_timeout: session.session_timeout,
71            max_response_message_size: 0,
72            nonce_length: session.session_nonce_length,
73            header: RequestHeaderBuilder::new_from_session(session),
74        }
75    }
76
77    /// Create a new `CreateSession` request with the given data.
78    pub fn new_manual(
79        certificate_store: &'a RwLock<CertificateStore>,
80        endpoint: &'a EndpointDescription,
81        session_id: u32,
82        timeout: Duration,
83        auth_token: NodeId,
84        request_handle: IntegerId,
85    ) -> Self {
86        Self {
87            endpoint_url: UAString::null(),
88            server_uri: UAString::null(),
89            client_description: ApplicationDescription::default(),
90            session_name: UAString::null(),
91            client_certificate: None,
92            session_timeout: 0.0,
93            max_response_message_size: 0,
94            certificate_store,
95            endpoint,
96            nonce_length: 32,
97            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
98        }
99    }
100
101    /// Set the client description.
102    pub fn client_description(mut self, desc: impl Into<ApplicationDescription>) -> Self {
103        self.client_description = desc.into();
104        self
105    }
106
107    /// Set the server URI.
108    pub fn server_uri(mut self, server_uri: impl Into<UAString>) -> Self {
109        self.server_uri = server_uri.into();
110        self
111    }
112
113    /// Set the target endpoint URL.
114    pub fn endpoint_url(mut self, endpoint_url: impl Into<UAString>) -> Self {
115        self.endpoint_url = endpoint_url.into();
116        self
117    }
118
119    /// Set the session name.
120    pub fn session_name(mut self, session_name: impl Into<UAString>) -> Self {
121        self.session_name = session_name.into();
122        self
123    }
124
125    /// Set the client certificate.
126    pub fn client_certificate(mut self, client_certificate: X509) -> Self {
127        self.client_certificate = Some(client_certificate);
128        self
129    }
130
131    /// Load the client certificate from the certificate store.
132    pub fn client_cert_from_store(mut self, certificate_store: &RwLock<CertificateStore>) -> Self {
133        let cert_store = trace_read_lock!(certificate_store);
134        self.client_certificate = cert_store.read_own_cert().ok();
135        self
136    }
137
138    /// Set the timeout for the session.
139    pub fn session_timeout(mut self, session_timeout: f64) -> Self {
140        self.session_timeout = session_timeout;
141        self
142    }
143
144    /// Set the requested maximum response message size.
145    pub fn max_response_message_size(mut self, max_response_message_size: u32) -> Self {
146        self.max_response_message_size = max_response_message_size;
147        self
148    }
149
150    /// Set the length of the client nonce used to generate this request.
151    pub fn nonce_length(mut self, nonce_length: usize) -> Self {
152        self.nonce_length = nonce_length;
153        self
154    }
155}
156
157impl UARequest for CreateSession<'_> {
158    type Out = CreateSessionResponse;
159
160    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
161    where
162        Self: 'a,
163    {
164        let client_nonce = random::byte_string(self.nonce_length);
165
166        let request = CreateSessionRequest {
167            request_header: self.header.header,
168            client_description: self.client_description,
169            server_uri: self.server_uri,
170            endpoint_url: self.endpoint_url,
171            session_name: self.session_name,
172            client_nonce: client_nonce.clone(),
173            client_certificate: self
174                .client_certificate
175                .as_ref()
176                .map(|v| v.as_byte_string())
177                .unwrap_or_default(),
178            requested_session_timeout: self.session_timeout,
179            max_response_message_size: self.max_response_message_size,
180        };
181        let response = channel.send(request, self.header.timeout).await?;
182
183        if let ResponseMessage::CreateSession(response) = response {
184            tracing::debug!("create_session, success");
185            process_service_result(&response.response_header)?;
186
187            let security_policy = channel.security_policy();
188
189            if security_policy != SecurityPolicy::None {
190                if let Ok(server_certificate) =
191                    opcua_crypto::X509::from_byte_string(&response.server_certificate)
192                {
193                    // Validate server certificate against hostname and application_uri
194                    let hostname = hostname_from_url(self.endpoint.endpoint_url.as_ref())
195                        .map_err(|_| StatusCode::BadUnexpectedError)?;
196                    let application_uri = self.endpoint.server.application_uri.as_ref();
197
198                    let certificate_store = trace_write_lock!(self.certificate_store);
199                    certificate_store.validate_or_reject_application_instance_cert(
200                        &server_certificate,
201                        security_policy,
202                        Some(&hostname),
203                        Some(application_uri),
204                    )?;
205
206                    opcua_crypto::verify_signature_data(
207                        &response.server_signature,
208                        security_policy,
209                        &server_certificate,
210                        self.client_certificate
211                            .as_ref()
212                            .ok_or(StatusCode::BadCertificateInvalid)?,
213                        client_nonce.as_ref(),
214                    )?;
215                } else {
216                    return Err(StatusCode::BadCertificateInvalid);
217                }
218            }
219
220            channel.update_from_created_session(
221                &response.server_nonce,
222                &response.server_certificate,
223                &response.authentication_token,
224            )?;
225
226            Ok(*response)
227        } else {
228            tracing::error!("create_session failed");
229            Err(process_unexpected_response(response))
230        }
231    }
232}
233
234#[derive(Debug, Clone)]
235/// Sends an [`ActivateSessionRequest`] to the server to activate the session tied to
236/// the secure channel.
237///
238/// See OPC UA Part 4 - Services 5.6.3 for complete description of the service and error responses.
239///
240/// Note: Avoid calling this on sessions managed by the [`Session`] type. Session activation
241/// is handled automatically as part of connect/reconnect logic.
242pub struct ActivateSession {
243    identity_token: IdentityToken,
244    private_key: Option<PKey<RsaPrivateKey>>,
245    locale_ids: Vec<UAString>,
246    client_software_certificates: Vec<SignedSoftwareCertificate>,
247    endpoint: EndpointDescription,
248
249    header: RequestHeaderBuilder,
250}
251
252builder_base!(ActivateSession);
253
254impl ActivateSession {
255    /// Create a new `ActivateSession` request.
256    ///
257    /// Crate private since there is no way to safely use this.
258    pub(crate) fn new(session: &Session) -> Self {
259        Self {
260            identity_token: session.endpoint_info().user_identity_token.clone(),
261            private_key: session.channel.read_own_private_key(),
262            locale_ids: session
263                .endpoint_info()
264                .preferred_locales
265                .iter()
266                .map(UAString::from)
267                .collect(),
268            client_software_certificates: Vec::new(),
269            endpoint: session.endpoint_info().endpoint.clone(),
270            header: RequestHeaderBuilder::new_from_session(session),
271        }
272    }
273
274    /// Create a new `ActivateSession` request.
275    pub fn new_manual(
276        endpoint: EndpointDescription,
277        session_id: u32,
278        timeout: Duration,
279        auth_token: NodeId,
280        request_handle: IntegerId,
281    ) -> Self {
282        Self {
283            identity_token: IdentityToken::Anonymous,
284            private_key: None,
285            locale_ids: Vec::new(),
286            client_software_certificates: Vec::new(),
287            endpoint,
288            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
289        }
290    }
291
292    /// Set the identity token.
293    pub fn identity_token(mut self, identity_token: IdentityToken) -> Self {
294        self.identity_token = identity_token;
295        self
296    }
297
298    /// Set the client private key.
299    pub fn private_key(mut self, private_key: PKey<RsaPrivateKey>) -> Self {
300        self.private_key = Some(private_key);
301        self
302    }
303
304    /// Set the requested list of locales.
305    pub fn locale_ids(mut self, locale_ids: Vec<UAString>) -> Self {
306        self.locale_ids = locale_ids;
307        self
308    }
309
310    /// Add a requested locale with the given ID.
311    pub fn locale_id(mut self, locale_id: impl Into<UAString>) -> Self {
312        self.locale_ids.push(locale_id.into());
313        self
314    }
315
316    /// Set the client software certificates.
317    pub fn client_software_certificates(
318        mut self,
319        certificates: Vec<SignedSoftwareCertificate>,
320    ) -> Self {
321        self.client_software_certificates = certificates;
322        self
323    }
324
325    /// Add a client software certificate.
326    pub fn client_software_certificate(mut self, certificate: SignedSoftwareCertificate) -> Self {
327        self.client_software_certificates.push(certificate);
328        self
329    }
330
331    async fn user_identity_token(
332        &self,
333        remote_nonce: &ByteString,
334        remote_cert: &Option<X509>,
335        security_mode: MessageSecurityMode,
336        channel_security_policy: SecurityPolicy,
337    ) -> Result<(ExtensionObject, SignatureData), Error> {
338        let user_token_type = match &self.identity_token {
339            IdentityToken::Anonymous => UserTokenType::Anonymous,
340            IdentityToken::UserName(_, _) => UserTokenType::UserName,
341            IdentityToken::X509(_, _) => UserTokenType::Certificate,
342            IdentityToken::IssuedToken(_) => UserTokenType::IssuedToken,
343        };
344        let Some(policy) = self.endpoint.find_policy(user_token_type) else {
345            builder_error!(
346                self,
347                "Cannot find user token type {:?} for this endpoint, cannot connect",
348                user_token_type
349            );
350            return Err(Error::new(
351                StatusCode::BadSecurityPolicyRejected,
352                format!(
353                    "Cannot find user token type {user_token_type:?} for this endpoint, cannot connect"
354                ),
355            ));
356        };
357        let security_policy = if policy.security_policy_uri.is_empty() {
358            // Assume None
359            SecurityPolicy::None
360        } else {
361            SecurityPolicy::from_uri(policy.security_policy_uri.as_ref())
362        };
363
364        if security_policy == SecurityPolicy::Unknown {
365            error!("Unknown security policy {}", policy.security_policy_uri);
366            return Err(Error::new(
367                StatusCode::BadSecurityPolicyRejected,
368                format!("Unknown security policy {}", policy.security_policy_uri),
369            ));
370        }
371
372        match &self.identity_token {
373            IdentityToken::Anonymous => {
374                let identity_token = AnonymousIdentityToken {
375                    policy_id: policy.policy_id.clone(),
376                };
377                let identity_token = ExtensionObject::from_message(identity_token);
378                Ok((identity_token, SignatureData::null()))
379            }
380            IdentityToken::UserName(user, pass) => {
381                let nonce = remote_nonce.as_ref();
382                let cert = remote_cert;
383                let secret = legacy_encrypt_secret(
384                    channel_security_policy,
385                    security_mode,
386                    policy,
387                    nonce,
388                    cert,
389                    pass.0.as_bytes(),
390                )?;
391                let identity_token = UserNameIdentityToken {
392                    policy_id: secret.policy,
393                    user_name: UAString::from(user.as_str()),
394                    password: secret.secret,
395                    encryption_algorithm: secret.encryption_algorithm,
396                };
397                Ok((
398                    ExtensionObject::from_message(identity_token),
399                    SignatureData::null(),
400                ))
401            }
402            IdentityToken::X509(cert, private_key) => {
403                let nonce = remote_nonce.as_ref();
404                let server_cert = remote_cert;
405                let Some(server_cert) = &server_cert else {
406                    error!("Cannot create an X509IdentityToken because the remote server has no cert with which to create a signature");
407                    return Err(Error::new(StatusCode::BadCertificateInvalid, "Cannot create an X509IdentityToken because the remote server has no cert with which to create a signature"));
408                };
409
410                let user_token_signature = opcua_crypto::create_signature_data(
411                    private_key,
412                    security_policy,
413                    &server_cert.as_byte_string(),
414                    &ByteString::from(&nonce),
415                )
416                .map_err(|s| Error::new(s, "Failed to create token signature"))?;
417
418                // Create identity token
419                let identity_token = X509IdentityToken {
420                    policy_id: policy.policy_id.clone(),
421                    certificate_data: cert.as_byte_string(),
422                };
423
424                Ok((
425                    ExtensionObject::from_message(identity_token),
426                    user_token_signature,
427                ))
428            }
429            IdentityToken::IssuedToken(source) => {
430                let token = source.0.get_issued_token().await?;
431                let nonce = remote_nonce.as_ref();
432                let cert = remote_cert;
433                let secret = legacy_encrypt_secret(
434                    channel_security_policy,
435                    security_mode,
436                    policy,
437                    nonce,
438                    cert,
439                    token.as_ref(),
440                )?;
441                let identity_token = IssuedIdentityToken {
442                    policy_id: secret.policy,
443                    encryption_algorithm: secret.encryption_algorithm,
444                    token_data: secret.secret,
445                };
446                Ok((
447                    ExtensionObject::from_message(identity_token),
448                    SignatureData::null(),
449                ))
450            }
451        }
452    }
453
454    async fn build_request(
455        self,
456        channel: &AsyncSecureChannel,
457    ) -> Result<ActivateSessionRequest, StatusCode> {
458        let (remote_cert, remote_nonce, security_policy, message_security_mode) = {
459            let secure_channel = trace_read_lock!(channel.secure_channel);
460            (
461                secure_channel.remote_cert(),
462                secure_channel.remote_nonce_as_byte_string(),
463                secure_channel.security_policy(),
464                secure_channel.security_mode(),
465            )
466        };
467        let (user_identity_token, user_token_signature) = self
468            .user_identity_token(
469                &remote_nonce,
470                &remote_cert,
471                message_security_mode,
472                security_policy,
473            )
474            .await?;
475        let client_signature = match security_policy {
476            SecurityPolicy::None => SignatureData::null(),
477            _ => {
478                let Some(client_pkey) = self.private_key else {
479                    error!("Cannot create client signature - no pkey!");
480                    return Err(StatusCode::BadUnexpectedError);
481                };
482
483                let Some(server_cert) = remote_cert else {
484                    error!("Cannot sign server certificate because server cert is null");
485                    return Err(StatusCode::BadUnexpectedError);
486                };
487
488                let server_nonce = remote_nonce;
489                if server_nonce.is_null_or_empty() {
490                    error!("Cannot sign server certificate because server nonce is empty");
491                    return Err(StatusCode::BadUnexpectedError);
492                }
493
494                let server_cert = server_cert.as_byte_string();
495                opcua_crypto::create_signature_data(
496                    &client_pkey,
497                    security_policy,
498                    &server_cert,
499                    &server_nonce,
500                )?
501            }
502        };
503
504        Ok(ActivateSessionRequest {
505            request_header: self.header.header,
506            client_signature,
507            client_software_certificates: if self.client_software_certificates.is_empty() {
508                None
509            } else {
510                Some(self.client_software_certificates)
511            },
512            locale_ids: if self.locale_ids.is_empty() {
513                None
514            } else {
515                Some(self.locale_ids)
516            },
517            user_identity_token,
518            user_token_signature,
519        })
520    }
521}
522
523impl UARequest for ActivateSession {
524    type Out = ActivateSessionResponse;
525
526    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
527    where
528        Self: 'a,
529    {
530        let timeout = self.header.timeout;
531        let request = self.build_request(channel).await?;
532
533        let response = channel.send(request, timeout).await?;
534
535        if let ResponseMessage::ActivateSession(response) = response {
536            tracing::debug!("activate_session success");
537            // trace!("ActivateSessionResponse = {:#?}", response);
538            process_service_result(&response.response_header)?;
539            Ok(*response)
540        } else {
541            tracing::error!("activate_session failed");
542            Err(process_unexpected_response(response))
543        }
544    }
545}
546
547#[derive(Debug, Clone)]
548/// Close the session by sending a [`CloseSessionRequest`] to the server.
549///
550/// Note: Avoid using this on an session managed by the [`Session`] type,
551/// instead call [`Session::disconnect`].
552pub struct CloseSession {
553    delete_subscriptions: bool,
554    header: RequestHeaderBuilder,
555}
556
557builder_base!(CloseSession);
558
559impl CloseSession {
560    /// Create a new `CloseSession` request.
561    ///
562    /// Crate private as there is no way to use this safely.
563    pub(crate) fn new(session: &Session) -> Self {
564        Self {
565            delete_subscriptions: true,
566            header: RequestHeaderBuilder::new_from_session(session),
567        }
568    }
569
570    /// Create a new `CloseSession` request.
571    pub fn new_manual(
572        session_id: u32,
573        timeout: Duration,
574        auth_token: NodeId,
575        request_handle: IntegerId,
576    ) -> Self {
577        Self {
578            delete_subscriptions: true,
579            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
580        }
581    }
582
583    /// Set `DeleteSubscriptions`, indicating to the server whether it should
584    /// delete subscriptions immediately or wait for them to expire.
585    pub fn delete_subscriptions(mut self, delete_subscriptions: bool) -> Self {
586        self.delete_subscriptions = delete_subscriptions;
587        self
588    }
589}
590
591impl UARequest for CloseSession {
592    type Out = CloseSessionResponse;
593
594    async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
595    where
596        Self: 'a,
597    {
598        let request = CloseSessionRequest {
599            delete_subscriptions: self.delete_subscriptions,
600            request_header: self.header.header,
601        };
602        let response = channel.send(request, self.header.timeout).await?;
603        if let ResponseMessage::CloseSession(response) = response {
604            process_service_result(&response.response_header)?;
605            Ok(*response)
606        } else {
607            error!("close_session failed {:?}", response);
608            Err(process_unexpected_response(response))
609        }
610    }
611}
612
613#[derive(Debug, Clone)]
614/// Cancels an outstanding service request by sending a [`CancelRequest`] to the server.
615///
616/// See OPC UA Part 4 - Services 5.6.5 for complete description of the service and error responses.
617pub struct Cancel {
618    request_handle: IntegerId,
619    header: RequestHeaderBuilder,
620}
621
622builder_base!(Cancel);
623
624impl Cancel {
625    /// Create a new cancel request, to cancel a running service call.
626    pub fn new(request_to_cancel: IntegerId, session: &Session) -> Self {
627        Self {
628            request_handle: request_to_cancel,
629            header: RequestHeaderBuilder::new_from_session(session),
630        }
631    }
632
633    /// Create a new cancel request, to cancel a running service call.
634    pub fn new_manual(
635        request_to_cancel: IntegerId,
636        session_id: u32,
637        timeout: Duration,
638        auth_token: NodeId,
639        request_handle: IntegerId,
640    ) -> Self {
641        Self {
642            request_handle: request_to_cancel,
643            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
644        }
645    }
646}
647
648impl UARequest for Cancel {
649    type Out = CancelResponse;
650
651    async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
652    where
653        Self: 'a,
654    {
655        let request = CancelRequest {
656            request_header: self.header.header,
657            request_handle: self.request_handle,
658        };
659
660        let response = channel.send(request, self.header.timeout).await?;
661        if let ResponseMessage::Cancel(response) = response {
662            process_service_result(&response.response_header)?;
663            Ok(*response)
664        } else {
665            Err(process_unexpected_response(response))
666        }
667    }
668}
669
670impl Session {
671    /// Sends a [`CreateSessionRequest`] to the server, returning the session id of the created
672    /// session. Internally, the session will store the authentication token which is used for requests
673    /// subsequent to this call.
674    ///
675    /// See OPC UA Part 4 - Services 5.6.2 for complete description of the service and error responses.
676    ///
677    /// # Returns
678    ///
679    /// * `Ok(NodeId)` - Success, session id
680    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
681    ///
682    pub(crate) async fn create_session(&self) -> Result<NodeId, StatusCode> {
683        let response = CreateSession::new(self).send(&self.channel).await?;
684
685        let session_id = {
686            self.session_id.store(Arc::new(response.session_id.clone()));
687            response.session_id.clone()
688        };
689
690        Ok(session_id)
691    }
692
693    /// Sends an [`ActivateSessionRequest`] to the server to activate this session
694    ///
695    /// See OPC UA Part 4 - Services 5.6.3 for complete description of the service and error responses.
696    ///
697    /// # Returns
698    ///
699    /// * `Ok(())` - Success
700    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
701    ///
702    pub(crate) async fn activate_session(&self) -> Result<(), StatusCode> {
703        ActivateSession::new(self).send(&self.channel).await?;
704        Ok(())
705    }
706
707    /// Close the session by sending a [`CloseSessionRequest`] to the server.
708    ///
709    /// This is not accessible by users, they must instead call `disconnect` to properly close the session.
710    pub(crate) async fn close_session(&self, delete_subscriptions: bool) -> Result<(), StatusCode> {
711        CloseSession::new(self)
712            .delete_subscriptions(delete_subscriptions)
713            .send(&self.channel)
714            .await?;
715        Ok(())
716    }
717
718    /// Cancels an outstanding service request by sending a [`CancelRequest`] to the server.
719    ///
720    /// See OPC UA Part 4 - Services 5.6.5 for complete description of the service and error responses.
721    ///
722    /// # Arguments
723    ///
724    /// * `request_handle` - Handle to the outstanding request to be cancelled.
725    ///
726    /// # Returns
727    ///
728    /// * `Ok(u32)` - Success, number of cancelled requests
729    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
730    ///
731    pub async fn cancel(&self, request_handle: IntegerId) -> Result<u32, StatusCode> {
732        Ok(Cancel::new(request_handle, self)
733            .send(&self.channel)
734            .await?
735            .cancel_count)
736    }
737}