1use std::{
11 cmp,
12 collections::HashSet,
13 result::Result,
14 str::FromStr,
15 sync::{mpsc::SyncSender, Arc},
16 thread,
17};
18
19use tokio::{
20 sync::oneshot,
21 time::{interval, Duration, Instant},
22};
23
24use crate::{
25 client::{
26 callbacks::{OnConnectionStatusChange, OnSessionClosed, OnSubscriptionNotification},
27 client::IdentityToken,
28 comms::tcp_transport::TcpTransport,
29 process_service_result, process_unexpected_response,
30 session::{
31 services::*,
32 session_debug, session_error,
33 session_state::{ConnectionState, SessionState},
34 session_trace, session_warn,
35 },
36 session_retry_policy::{Answer, SessionRetryPolicy},
37 subscription::{self, Subscription},
38 subscription_state::SubscriptionState,
39 trust_any_server_cert,
40 },
41 core::{
42 comms::{
43 secure_channel::{Role, SecureChannel},
44 url::*,
45 },
46 supported_message::SupportedMessage,
47 RUNTIME,
48 },
49 crypto::{
50 self as crypto, user_identity::make_user_name_identity_token, CertificateStore,
51 SecurityPolicy, X509,
52 },
53 deregister_runtime_component, register_runtime_component,
54 sync::*,
55 types::{node_ids::ObjectId, status_code::StatusCode, *},
56};
57
58#[derive(Debug)]
61pub struct SessionInfo {
62 pub endpoint: EndpointDescription,
64 pub user_identity_token: IdentityToken,
66 pub preferred_locales: Vec<String>,
68}
69
70impl Into<SessionInfo> for EndpointDescription {
71 fn into(self) -> SessionInfo {
72 (self, IdentityToken::Anonymous).into()
73 }
74}
75
76impl Into<SessionInfo> for (EndpointDescription, IdentityToken) {
77 fn into(self) -> SessionInfo {
78 SessionInfo {
79 endpoint: self.0,
80 user_identity_token: self.1,
81 preferred_locales: Vec::new(),
82 }
83 }
84}
85
86#[derive(Debug)]
88pub enum SessionCommand {
89 Stop,
91}
92
93pub struct Session {
102 application_description: ApplicationDescription,
104 session_name: UAString,
106 session_info: SessionInfo,
108 session_state: Arc<RwLock<SessionState>>,
110 subscription_state: Arc<RwLock<SubscriptionState>>,
112 transport: TcpTransport,
114 certificate_store: Arc<RwLock<CertificateStore>>,
116 secure_channel: Arc<RwLock<SecureChannel>>,
118 session_retry_policy: Arc<Mutex<SessionRetryPolicy>>,
120 ignore_clock_skew: bool,
122 single_threaded_executor: bool,
124 runtime: Arc<Mutex<tokio::runtime::Runtime>>,
126}
127
128impl Drop for Session {
129 fn drop(&mut self) {
130 info!("Session has dropped");
131 self.disconnect();
132 }
133}
134
135impl Session {
136 pub(crate) fn new<T>(
150 application_description: ApplicationDescription,
151 session_name: T,
152 certificate_store: Arc<RwLock<CertificateStore>>,
153 session_info: SessionInfo,
154 session_retry_policy: SessionRetryPolicy,
155 decoding_options: DecodingOptions,
156 ignore_clock_skew: bool,
157 single_threaded_executor: bool,
158 ) -> Session
159 where
160 T: Into<UAString>,
161 {
162 let session_name = session_name.into();
163
164 let secure_channel = Arc::new(RwLock::new(SecureChannel::new(
165 certificate_store.clone(),
166 Role::Client,
167 decoding_options,
168 )));
169
170 let subscription_state = Arc::new(RwLock::new(SubscriptionState::new()));
171
172 let session_state = Arc::new(RwLock::new(SessionState::new(
173 ignore_clock_skew,
174 secure_channel.clone(),
175 subscription_state.clone(),
176 )));
177
178 let transport = TcpTransport::new(
179 secure_channel.clone(),
180 session_state.clone(),
181 single_threaded_executor,
182 );
183
184 let runtime = tokio::runtime::Builder::new_current_thread()
186 .enable_all()
187 .build()
188 .unwrap();
189
190 Session {
191 application_description,
192 session_name,
193 session_info,
194 session_state,
195 certificate_store,
196 subscription_state,
197 transport,
198 secure_channel,
199 session_retry_policy: Arc::new(Mutex::new(session_retry_policy)),
200 ignore_clock_skew,
201 single_threaded_executor,
202 runtime: Arc::new(Mutex::new(runtime)),
203 }
204 }
205
206 fn reset(&mut self) {
207 {
209 let mut secure_channel = trace_write_lock!(self.secure_channel);
210 secure_channel.clear_security_token();
211 }
212
213 self.session_state = Arc::new(RwLock::new(SessionState::new(
215 self.ignore_clock_skew,
216 self.secure_channel.clone(),
217 self.subscription_state.clone(),
218 )));
219
220 }
222
223 pub fn connect_and_activate(&mut self) -> Result<(), StatusCode> {
232 self.connect()?;
234 self.create_session()?;
235 self.activate_session()?;
236 Ok(())
237 }
238
239 pub fn set_session_retry_policy(&mut self, session_retry_policy: SessionRetryPolicy) {
248 self.session_retry_policy = Arc::new(Mutex::new(session_retry_policy));
249 }
250
251 pub fn set_session_closed_callback<CB>(&mut self, session_closed_callback: CB)
258 where
259 CB: OnSessionClosed + Send + Sync + 'static,
260 {
261 let mut session_state = trace_write_lock!(self.session_state);
262 session_state.set_session_closed_callback(session_closed_callback);
263 }
264
265 pub fn set_connection_status_callback<CB>(&mut self, connection_status_callback: CB)
273 where
274 CB: OnConnectionStatusChange + Send + Sync + 'static,
275 {
276 let mut session_state = trace_write_lock!(self.session_state);
277 session_state.set_connection_status_callback(connection_status_callback);
278 }
279
280 pub fn reconnect_and_activate(&mut self) -> Result<(), StatusCode> {
293 if self.is_connected() {
295 session_error!(
296 self,
297 "Reconnect is going to do nothing because already connected"
298 );
299 Err(StatusCode::BadUnexpectedError)
300 } else {
301 self.reset();
303
304 self.connect_no_retry()?;
306
307 match self.activate_session() {
309 Err(status_code) => {
310 info!("Session activation failed on reconnect, error = {}, so creating a new session", status_code);
312 {
313 let mut session_state = trace_write_lock!(self.session_state);
314 session_state.reset();
315 }
316
317 session_debug!(self, "create_session");
318 self.create_session()?;
319 session_debug!(self, "activate_session");
320 self.activate_session()?;
321 session_debug!(self, "reconnect should be complete");
322 }
323 Ok(_) => {
324 info!("Activation succeeded");
325 }
326 }
327 session_debug!(self, "transfer_subscriptions_from_old_session");
328 self.transfer_subscriptions_from_old_session()?;
329 Ok(())
330 }
331 }
332
333 fn transfer_subscriptions_from_old_session(&mut self) -> Result<(), StatusCode> {
336 let subscription_state = self.subscription_state.clone();
337
338 let subscription_ids = {
339 let subscription_state = trace_read_lock!(subscription_state);
340 subscription_state.subscription_ids()
341 };
342
343 if let Some(subscription_ids) = subscription_ids {
345 let mut subscription_ids_to_recreate =
348 subscription_ids.iter().copied().collect::<HashSet<u32>>();
349 if let Ok(transfer_results) = self.transfer_subscriptions(&subscription_ids, true) {
350 session_debug!(self, "transfer_results = {:?}", transfer_results);
351 transfer_results.iter().enumerate().for_each(|(i, r)| {
352 if r.status_code.is_good() {
353 subscription_ids_to_recreate.remove(&subscription_ids[i]);
355 }
356 });
357 }
358
359 if !subscription_ids_to_recreate.is_empty() {
361 session_warn!(self, "Some or all of the existing subscriptions could not be transferred and must be created manually");
362 }
363
364 subscription_ids_to_recreate
366 .iter()
367 .for_each(|subscription_id| {
368 info!("Recreating subscription {}", subscription_id);
369 let deleted_subscription = {
371 let mut subscription_state = trace_write_lock!(subscription_state);
372 subscription_state.delete_subscription(*subscription_id)
373 };
374
375 if let Some(subscription) = deleted_subscription {
376 if let Ok(subscription_id) = self.create_subscription_inner(
378 subscription.publishing_interval(),
379 subscription.lifetime_count(),
380 subscription.max_keep_alive_count(),
381 subscription.max_notifications_per_publish(),
382 subscription.priority(),
383 subscription.publishing_enabled(),
384 subscription.notification_callback(),
385 ) {
386 info!("New subscription created with id {}", subscription_id);
387
388 let items_to_create = subscription
390 .monitored_items()
391 .iter()
392 .map(|(_, item)| MonitoredItemCreateRequest {
393 item_to_monitor: item.item_to_monitor().clone(),
394 monitoring_mode: item.monitoring_mode(),
395 requested_parameters: MonitoringParameters {
396 client_handle: item.client_handle(),
397 sampling_interval: item.sampling_interval(),
398 filter: ExtensionObject::null(),
399 queue_size: item.queue_size() as u32,
400 discard_oldest: true,
401 },
402 })
403 .collect::<Vec<MonitoredItemCreateRequest>>();
404 let _ = self.create_monitored_items(
405 subscription_id,
406 TimestampsToReturn::Both,
407 &items_to_create,
408 );
409
410 subscription.monitored_items().iter().for_each(|(_, item)| {
413 let triggered_items = item.triggered_items();
414 if !triggered_items.is_empty() {
415 let links_to_add =
416 triggered_items.iter().copied().collect::<Vec<u32>>();
417 let _ = self.set_triggering(
418 subscription_id,
419 item.id(),
420 links_to_add.as_slice(),
421 &[],
422 );
423 }
424 });
425 } else {
426 session_warn!(
427 self,
428 "Could not create a subscription from the existing subscription {}",
429 subscription_id
430 );
431 }
432 } else {
433 panic!(
434 "Subscription {}, doesn't exist although it should",
435 subscription_id
436 );
437 }
438 });
439 }
440 Ok(())
441 }
442
443 pub fn connect(&self) -> Result<(), StatusCode> {
447 loop {
448 match self.connect_no_retry() {
449 Ok(_) => {
450 info!("Connect was successful");
451 let mut session_retry_policy = trace_lock!(self.session_retry_policy);
452 session_retry_policy.reset_retry_count();
453 return Ok(());
454 }
455 Err(status_code) => {
456 self.disconnect();
457 let mut session_retry_policy = trace_lock!(self.session_retry_policy);
458 session_retry_policy.increment_retry_count();
459 session_warn!(
460 self,
461 "Connect was unsuccessful, error = {}, retries = {}",
462 status_code,
463 session_retry_policy.retry_count()
464 );
465
466 match session_retry_policy.should_retry_connect(DateTime::now()) {
467 Answer::GiveUp => {
468 session_error!(self, "Session has given up trying to connect to the server after {} retries", session_retry_policy.retry_count());
469 return Err(StatusCode::BadNotConnected);
470 }
471 Answer::Retry => {
472 info!("Retrying to connect to server...");
473 session_retry_policy.set_last_attempt(DateTime::now());
474 }
475 Answer::WaitFor(sleep_for) => {
476 thread::sleep(Duration::from_millis(sleep_for as u64));
479 }
480 }
481 }
482 }
483 }
484 }
485
486 pub fn connect_no_retry(&self) -> Result<(), StatusCode> {
496 let endpoint_url = self.session_info.endpoint.endpoint_url.clone();
497 info!("Connect");
498 let security_policy =
499 SecurityPolicy::from_str(self.session_info.endpoint.security_policy_uri.as_ref())
500 .unwrap();
501 if security_policy == SecurityPolicy::Unknown {
502 session_error!(
503 self,
504 "connect, security policy \"{}\" is unknown",
505 self.session_info.endpoint.security_policy_uri.as_ref()
506 );
507 Err(StatusCode::BadSecurityPolicyRejected)
508 } else {
509 let (cert, key) = {
510 let certificate_store = trace_write_lock!(self.certificate_store);
511 certificate_store.read_own_cert_and_pkey_optional()
512 };
513
514 {
515 let mut secure_channel = trace_write_lock!(self.secure_channel);
516 secure_channel.set_private_key(key);
517 secure_channel.set_cert(cert);
518 secure_channel.set_security_policy(security_policy);
519 secure_channel.set_security_mode(self.session_info.endpoint.security_mode);
520 let _ = secure_channel.set_remote_cert_from_byte_string(
521 &self.session_info.endpoint.server_certificate,
522 );
523 info!("Security policy = {:?}", security_policy);
524 info!(
525 "Security mode = {:?}",
526 self.session_info.endpoint.security_mode
527 );
528 }
529
530 self.transport.connect(endpoint_url.as_ref())?;
532 self.open_secure_channel()?;
533 self.on_connection_status_change(true);
534 Ok(())
535 }
536 }
537
538 pub(crate) fn session_state(&self) -> Arc<RwLock<SessionState>> {
539 self.session_state.clone()
540 }
541
542 pub fn disconnect(&self) {
546 if self.is_connected() {
547 let _ = self.close_session_and_delete_subscriptions();
548 let _ = self.close_secure_channel();
549
550 {
551 let session_state = trace_read_lock!(self.session_state);
552 session_state.quit();
553 }
554
555 self.transport.wait_for_disconnect();
556 self.on_connection_status_change(false);
557 }
558 }
559
560 pub fn is_connected(&self) -> bool {
568 self.transport.is_connected()
569 }
570
571 const POLL_SLEEP_INTERVAL: u64 = 10;
573
574 pub fn run(session: Arc<RwLock<Session>>) {
584 let (_tx, rx) = oneshot::channel();
585 Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx);
586 }
587
588 pub fn run_async(session: Arc<RwLock<Session>>) -> oneshot::Sender<SessionCommand> {
611 let (tx, rx) = oneshot::channel();
612 thread::spawn(move || Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx));
613 tx
614 }
615
616 pub async fn session_task(
622 session: Arc<RwLock<Session>>,
623 sleep_interval: u64,
624 rx: oneshot::Receiver<SessionCommand>,
625 ) {
626 tokio::select! {
627 _ = async {
628 let mut timer = interval(Duration::from_millis(sleep_interval));
629 loop {
630 let poll_result = {
632 let mut session = session.write();
633 session.poll().await
634 };
635 match poll_result {
636 Ok(did_something) => {
637 if !did_something {
639 timer.tick().await;
640 }
641 }
642 Err(_) => {
643 info!("Run session connection to server broke, so terminating");
645 break;
646 }
647 }
648 }
649 } => {}
650 message = rx => {
651 if let Ok(message) = message {
652 info!("Run session was terminated by a message {:?}", message);
654 }
655 else {
656 warn!("Run session was terminated, presumably by caller dropping oneshot sender. Don't do that unless you meant to.");
657 }
658 }
659 }
660 }
661
662 pub fn run_loop(
672 session: Arc<RwLock<Session>>,
673 sleep_interval: u64,
674 rx: oneshot::Receiver<SessionCommand>,
675 ) {
676 let task = {
677 let session = session.clone();
678 async move {
679 Self::session_task(session, sleep_interval, rx).await;
680 }
681 };
682 let runtime = {
684 let session = trace_read_lock!(session);
685 session.runtime.clone()
686 };
687 let runtime = trace_lock!(runtime);
688 runtime.block_on(task);
689 }
690
691 pub async fn poll(&mut self) -> Result<bool, ()> {
706 let did_something = if self.is_connected() {
707 let mut session_state = trace_write_lock!(self.session_state);
708 session_state.handle_publish_responses()
709 } else {
710 let should_retry_connect = {
711 let session_retry_policy = trace_lock!(self.session_retry_policy);
712 session_retry_policy.should_retry_connect(DateTime::now())
713 };
714 match should_retry_connect {
715 Answer::GiveUp => {
716 let session_retry_policy = trace_lock!(self.session_retry_policy);
717 session_error!(
718 self,
719 "Session has given up trying to reconnect to the server after {} retries",
720 session_retry_policy.retry_count()
721 );
722 return Err(());
723 }
724 Answer::Retry => {
725 info!("Retrying to reconnect to server...");
726 {
727 let mut session_retry_policy = trace_lock!(self.session_retry_policy);
728 session_retry_policy.set_last_attempt(DateTime::now());
729 }
730 if self.reconnect_and_activate().is_ok() {
731 info!("Retry to connect was successful");
732 let mut session_retry_policy = trace_lock!(self.session_retry_policy);
733 session_retry_policy.reset_retry_count();
734 } else {
735 let mut session_retry_policy = trace_lock!(self.session_retry_policy);
736 session_retry_policy.increment_retry_count();
737 session_warn!(
738 self,
739 "Reconnect was unsuccessful, retries = {}",
740 session_retry_policy.retry_count()
741 );
742 drop(session_retry_policy);
743 self.disconnect();
744 }
745 true
746 }
747 Answer::WaitFor(_) => {
748 false
751 }
752 }
753 };
754 Ok(did_something)
755 }
756
757 fn spawn_session_activity_task(&self, session_timeout: f64) {
765 session_debug!(self, "spawn_session_activity_task({})", session_timeout);
766
767 let connection_state = {
768 let session_state = trace_read_lock!(self.session_state);
769 session_state.connection_state()
770 };
771
772 let session_state = self.session_state.clone();
773
774 const MIN_SESSION_ACTIVITY_MS: u64 = 1000;
776 let session_activity = cmp::max((session_timeout as u64 / 4) * 3, MIN_SESSION_ACTIVITY_MS);
777 session_debug!(
778 self,
779 "session timeout is {}, activity timer is {}",
780 session_timeout,
781 session_activity
782 );
783
784 let id = format!("session-activity-thread-{:?}", thread::current().id());
785 let runtime = trace_lock!(self.runtime);
786 runtime.spawn(async move {
787 register_runtime_component!(&id);
788 let session_activity_interval = Duration::from_millis(session_activity);
791 let mut timer = interval(Duration::from_millis(MIN_SESSION_ACTIVITY_MS));
792 let mut last_timeout = Instant::now();
793
794 loop {
795 timer.tick().await;
796
797 if connection_state.is_finished() {
798 info!("Session activity timer is terminating");
799 break;
800 }
801
802 let now = Instant::now();
804
805 let interval = now - last_timeout;
807 if interval > session_activity_interval {
808 match connection_state.state() {
809 ConnectionState::Processing => {
810 info!("Session activity keep-alive request");
811 let mut session_state = trace_write_lock!(session_state);
812 let request_header = session_state.make_request_header();
813 let request = ReadRequest {
814 request_header,
815 max_age: 1f64,
816 timestamps_to_return: TimestampsToReturn::Server,
817 nodes_to_read: Some(vec![]),
818 };
819 let _ = session_state.async_send_request(request, None);
821 }
822 connection_state => {
823 info!("Session activity keep-alive is doing nothing - connection state = {:?}", connection_state);
824 }
825 };
826 last_timeout = now;
827 }
828 }
829
830 info!("Session activity timer task is finished");
831 deregister_runtime_component!(&id);
832 });
833 }
834
835 fn spawn_subscription_activity_task(&self) {
839 session_debug!(self, "spawn_subscription_activity_task",);
840
841 let connection_state = {
842 let session_state = trace_read_lock!(self.session_state);
843 session_state.connection_state()
844 };
845
846 const MIN_SUBSCRIPTION_ACTIVITY_MS: u64 = 1000;
847 let session_state = self.session_state.clone();
848 let subscription_state = self.subscription_state.clone();
849
850 let id = format!("subscription-activity-thread-{:?}", thread::current().id());
851 let runtime = trace_lock!(self.runtime);
852 runtime.spawn(async move {
853 register_runtime_component!(&id);
854
855 let mut timer = interval(Duration::from_millis(MIN_SUBSCRIPTION_ACTIVITY_MS));
858
859 let mut last_timeout: Instant;
860 let mut subscription_activity_interval: Duration;
861
862 loop {
863 timer.tick().await;
864
865 if connection_state.is_finished() {
866 info!("Session activity timer is terminating");
867 break;
868 }
869
870 if let (Some(keep_alive_timeout), last_publish_request) = {
871 let subscription_state = trace_read_lock!(subscription_state);
872 (
873 subscription_state.keep_alive_timeout(),
874 subscription_state.last_publish_request(),
875 )
876 } {
877 subscription_activity_interval =
878 Duration::from_millis((keep_alive_timeout / 4) * 3);
879 last_timeout = last_publish_request;
880
881 let now = Instant::now();
883
884 let interval = now - last_timeout;
886 if interval > subscription_activity_interval {
887 let mut session_state = trace_write_lock!(session_state);
888 let _ = session_state.async_publish();
889 }
890 }
891 }
892
893 info!("Subscription activity timer task is finished");
894 deregister_runtime_component!(&id);
895 });
896 }
897
898 fn create_subscription_inner(
900 &self,
901 publishing_interval: f64,
902 lifetime_count: u32,
903 max_keep_alive_count: u32,
904 max_notifications_per_publish: u32,
905 priority: u8,
906 publishing_enabled: bool,
907 callback: Arc<Mutex<dyn OnSubscriptionNotification + Send + Sync + 'static>>,
908 ) -> Result<u32, StatusCode> {
909 let request = CreateSubscriptionRequest {
910 request_header: self.make_request_header(),
911 requested_publishing_interval: publishing_interval,
912 requested_lifetime_count: lifetime_count,
913 requested_max_keep_alive_count: max_keep_alive_count,
914 max_notifications_per_publish,
915 publishing_enabled,
916 priority,
917 };
918 let response = self.send_request(request)?;
919 if let SupportedMessage::CreateSubscriptionResponse(response) = response {
920 process_service_result(&response.response_header)?;
921 let subscription = Subscription::new(
922 response.subscription_id,
923 response.revised_publishing_interval,
924 response.revised_lifetime_count,
925 response.revised_max_keep_alive_count,
926 max_notifications_per_publish,
927 publishing_enabled,
928 priority,
929 callback,
930 );
931
932 {
934 let mut subscription_state = trace_write_lock!(self.subscription_state);
935 subscription_state.add_subscription(subscription);
936 }
937
938 {
940 let mut session_state = trace_write_lock!(self.session_state);
941 let _ = session_state.async_publish();
942 }
943
944 session_debug!(
945 self,
946 "create_subscription, created a subscription with id {}",
947 response.subscription_id
948 );
949 Ok(response.subscription_id)
950 } else {
951 session_error!(self, "create_subscription failed {:?}", response);
952 Err(process_unexpected_response(response))
953 }
954 }
955
956 pub fn delete_all_subscriptions(&self) -> Result<Vec<(u32, StatusCode)>, StatusCode> {
967 let subscription_ids = {
968 let subscription_state = trace_read_lock!(self.subscription_state);
969 subscription_state.subscription_ids()
970 };
971 if let Some(ref subscription_ids) = subscription_ids {
972 let status_codes = self.delete_subscriptions(subscription_ids.as_slice())?;
973 Ok(subscription_ids
975 .iter()
976 .zip(status_codes)
977 .map(|(id, status_code)| (*id, status_code))
978 .collect())
979 } else {
980 session_trace!(
982 self,
983 "delete_all_subscriptions, called when there are no subscriptions"
984 );
985 Err(StatusCode::BadNothingToDo)
986 }
987 }
988
989 pub fn close_session_and_delete_subscriptions(&self) -> Result<(), StatusCode> {
999 if !self.is_connected() {
1000 return Err(StatusCode::BadNotConnected);
1001 }
1002 if trace_read_lock!(self.session_state).session_id().identifier == Identifier::Numeric(0) {
1005 return Ok(());
1006 }
1007 let request = CloseSessionRequest {
1008 delete_subscriptions: true,
1009 request_header: self.make_request_header(),
1010 };
1011 let response = self.send_request(request)?;
1012 if let SupportedMessage::CloseSessionResponse(_) = response {
1013 let mut subscription_state = trace_write_lock!(self.subscription_state);
1014 if let Some(subscription_ids) = subscription_state.subscription_ids() {
1015 for subscription_id in subscription_ids {
1016 subscription_state.delete_subscription(subscription_id);
1017 }
1018 }
1019 Ok(())
1020 } else {
1021 session_error!(self, "close_session failed {:?}", response);
1022 Err(process_unexpected_response(response))
1023 }
1024 }
1025
1026 pub fn subscription_state(&self) -> Arc<RwLock<SubscriptionState>> {
1028 self.subscription_state.clone()
1029 }
1030
1031 pub(crate) fn session_id(&self) -> String {
1033 let session_state = self.session_state();
1034 let session_state = session_state.read();
1035 format!("session:{}", session_state.id())
1036 }
1037
1038 fn on_connection_status_change(&self, connected: bool) {
1040 let mut session_state = trace_write_lock!(self.session_state);
1041 session_state.on_connection_status_change(connected);
1042 }
1043
1044 fn security_policy(&self) -> SecurityPolicy {
1046 let secure_channel = trace_read_lock!(self.secure_channel);
1047 secure_channel.security_policy()
1048 }
1049
1050 fn subscription_exists(&self, subscription_id: u32) -> bool {
1052 let subscription_state = trace_read_lock!(self.subscription_state);
1053 subscription_state.subscription_exists(subscription_id)
1054 }
1055
1056 fn user_identity_token(
1059 &self,
1060 server_cert: &Option<X509>,
1061 server_nonce: &[u8],
1062 ) -> Result<(ExtensionObject, SignatureData), StatusCode> {
1063 let user_identity_token = &self.session_info.user_identity_token;
1064 let user_token_type = match user_identity_token {
1065 IdentityToken::Anonymous => UserTokenType::Anonymous,
1066 IdentityToken::UserName(_, _) => UserTokenType::UserName,
1067 IdentityToken::X509(_, _) => UserTokenType::Certificate,
1068 };
1069
1070 let endpoint = &self.session_info.endpoint;
1071 let policy = endpoint.find_policy(user_token_type);
1072 session_debug!(self, "Endpoint policy = {:?}", policy);
1073
1074 match policy {
1076 None => {
1077 session_error!(
1078 self,
1079 "Cannot find user token type {:?} for this endpoint, cannot connect",
1080 user_token_type
1081 );
1082 Err(StatusCode::BadSecurityPolicyRejected)
1083 }
1084 Some(policy) => {
1085 let security_policy = if policy.security_policy_uri.is_null() {
1086 SecurityPolicy::None
1088 } else {
1089 SecurityPolicy::from_uri(policy.security_policy_uri.as_ref())
1090 };
1091 if security_policy == SecurityPolicy::Unknown {
1092 session_error!(
1093 self,
1094 "Can't support the security policy {}",
1095 policy.security_policy_uri
1096 );
1097 Err(StatusCode::BadSecurityPolicyRejected)
1098 } else {
1099 match user_identity_token {
1100 IdentityToken::Anonymous => {
1101 let identity_token = AnonymousIdentityToken {
1102 policy_id: policy.policy_id.clone(),
1103 };
1104 let identity_token = ExtensionObject::from_encodable(
1105 ObjectId::AnonymousIdentityToken_Encoding_DefaultBinary,
1106 &identity_token,
1107 );
1108 Ok((identity_token, SignatureData::null()))
1109 }
1110 IdentityToken::UserName(ref user, ref pass) => {
1111 let secure_channel = trace_read_lock!(self.secure_channel);
1112 let identity_token = self.make_user_name_identity_token(
1113 &secure_channel,
1114 policy,
1115 user,
1116 pass,
1117 )?;
1118 let identity_token = ExtensionObject::from_encodable(
1119 ObjectId::UserNameIdentityToken_Encoding_DefaultBinary,
1120 &identity_token,
1121 );
1122 Ok((identity_token, SignatureData::null()))
1123 }
1124 IdentityToken::X509(ref cert_path, ref private_key_path) => {
1125 if let Some(ref server_cert) = server_cert {
1126 let certificate_data = CertificateStore::read_cert(cert_path)
1128 .map_err(|e| {
1129 session_error!(
1130 self,
1131 "Certificate cannot be loaded from path {}, error = {}",
1132 cert_path.to_str().unwrap(),
1133 e
1134 );
1135 StatusCode::BadSecurityPolicyRejected
1136 })?;
1137 let private_key = CertificateStore::read_pkey(private_key_path)
1138 .map_err(|e| {
1139 session_error!(
1140 self,
1141 "Private key cannot be loaded from path {}, error = {}",
1142 private_key_path.to_str().unwrap(),
1143 e
1144 );
1145 StatusCode::BadSecurityPolicyRejected
1146 })?;
1147
1148 let user_token_signature = crypto::create_signature_data(
1150 &private_key,
1151 security_policy,
1152 &server_cert.as_byte_string(),
1153 &ByteString::from(server_nonce),
1154 )?;
1155
1156 let identity_token = X509IdentityToken {
1158 policy_id: policy.policy_id.clone(),
1159 certificate_data: certificate_data.as_byte_string(),
1160 };
1161 let identity_token = ExtensionObject::from_encodable(
1162 ObjectId::X509IdentityToken_Encoding_DefaultBinary,
1163 &identity_token,
1164 );
1165
1166 Ok((identity_token, user_token_signature))
1167 } else {
1168 session_error!(self, "Cannot create an X509IdentityToken because the remote server has no cert with which to create a signature");
1169 Err(StatusCode::BadCertificateInvalid)
1170 }
1171 }
1172 }
1173 }
1174 }
1175 }
1176 }
1177
1178 fn make_user_name_identity_token(
1181 &self,
1182 secure_channel: &SecureChannel,
1183 user_token_policy: &UserTokenPolicy,
1184 user: &str,
1185 pass: &str,
1186 ) -> Result<UserNameIdentityToken, StatusCode> {
1187 let channel_security_policy = secure_channel.security_policy();
1188 let nonce = secure_channel.remote_nonce();
1189 let cert = secure_channel.remote_cert();
1190 make_user_name_identity_token(
1191 channel_security_policy,
1192 user_token_policy,
1193 nonce,
1194 &cert,
1195 user,
1196 pass,
1197 )
1198 }
1199}
1200
1201impl Service for Session {
1202 fn make_request_header(&self) -> RequestHeader {
1205 let mut session_state = trace_write_lock!(self.session_state);
1206 session_state.make_request_header()
1207 }
1208
1209 fn send_request<T>(&self, request: T) -> Result<SupportedMessage, StatusCode>
1211 where
1212 T: Into<SupportedMessage>,
1213 {
1214 let mut session_state = trace_write_lock!(self.session_state);
1215 session_state.send_request(request)
1216 }
1217
1218 fn async_send_request<T>(
1220 &self,
1221 request: T,
1222 sender: Option<SyncSender<SupportedMessage>>,
1223 ) -> Result<u32, StatusCode>
1224 where
1225 T: Into<SupportedMessage>,
1226 {
1227 let mut session_state = trace_write_lock!(self.session_state);
1228 session_state.async_send_request(request, sender)
1229 }
1230}
1231
1232impl DiscoveryService for Session {
1233 fn find_servers<T>(&self, endpoint_url: T) -> Result<Vec<ApplicationDescription>, StatusCode>
1234 where
1235 T: Into<UAString>,
1236 {
1237 let request = FindServersRequest {
1238 request_header: self.make_request_header(),
1239 endpoint_url: endpoint_url.into(),
1240 locale_ids: None,
1241 server_uris: None,
1242 };
1243 let response = self.send_request(request)?;
1244 if let SupportedMessage::FindServersResponse(response) = response {
1245 process_service_result(&response.response_header)?;
1246 let servers = if let Some(servers) = response.servers {
1247 servers
1248 } else {
1249 Vec::new()
1250 };
1251 Ok(servers)
1252 } else {
1253 Err(process_unexpected_response(response))
1254 }
1255 }
1256
1257 fn get_endpoints(&self) -> Result<Vec<EndpointDescription>, StatusCode> {
1258 session_debug!(self, "get_endpoints");
1259 let endpoint_url = self.session_info.endpoint.endpoint_url.clone();
1260
1261 let request = GetEndpointsRequest {
1262 request_header: self.make_request_header(),
1263 endpoint_url,
1264 locale_ids: None,
1265 profile_uris: None,
1266 };
1267
1268 let response = self.send_request(request)?;
1269 if let SupportedMessage::GetEndpointsResponse(response) = response {
1270 process_service_result(&response.response_header)?;
1271 match response.endpoints {
1272 None => {
1273 session_debug!(self, "get_endpoints, success but no endpoints");
1274 Ok(Vec::new())
1275 }
1276 Some(endpoints) => {
1277 session_debug!(self, "get_endpoints, success");
1278 Ok(endpoints)
1279 }
1280 }
1281 } else {
1282 session_error!(self, "get_endpoints failed {:?}", response);
1283 Err(process_unexpected_response(response))
1284 }
1285 }
1286
1287 fn register_server(&self, server: RegisteredServer) -> Result<(), StatusCode> {
1288 let request = RegisterServerRequest {
1289 request_header: self.make_request_header(),
1290 server,
1291 };
1292 let response = self.send_request(request)?;
1293 if let SupportedMessage::RegisterServerResponse(response) = response {
1294 process_service_result(&response.response_header)?;
1295 Ok(())
1296 } else {
1297 Err(process_unexpected_response(response))
1298 }
1299 }
1300}
1301
1302impl SecureChannelService for Session {
1303 fn open_secure_channel(&self) -> Result<(), StatusCode> {
1304 session_debug!(self, "open_secure_channel");
1305 let mut session_state = trace_write_lock!(self.session_state);
1306 session_state.issue_or_renew_secure_channel(SecurityTokenRequestType::Issue)
1307 }
1308
1309 fn close_secure_channel(&self) -> Result<(), StatusCode> {
1310 let request = CloseSecureChannelRequest {
1311 request_header: self.make_request_header(),
1312 };
1313 let _ = self.async_send_request(request, None);
1315 Ok(())
1316 }
1317}
1318
1319impl SessionService for Session {
1320 fn create_session(&self) -> Result<NodeId, StatusCode> {
1321 let endpoint_url = self.session_info.endpoint.endpoint_url.clone();
1323
1324 let client_nonce = {
1325 let secure_channel = trace_read_lock!(self.secure_channel);
1326 secure_channel.local_nonce_as_byte_string()
1327 };
1328
1329 let server_uri = UAString::null();
1330 let session_name = self.session_name.clone();
1331
1332 let (client_certificate, _) = {
1333 let certificate_store = trace_write_lock!(self.certificate_store);
1334 certificate_store.read_own_cert_and_pkey_optional()
1335 };
1336
1337 let client_certificate = if let Some(ref client_certificate) = client_certificate {
1339 client_certificate.as_byte_string()
1340 } else {
1341 ByteString::null()
1342 };
1343
1344 let requested_session_timeout = {
1346 let session_retry_policy = trace_lock!(self.session_retry_policy);
1347 session_retry_policy.session_timeout()
1348 };
1349
1350 let request = CreateSessionRequest {
1351 request_header: self.make_request_header(),
1352 client_description: self.application_description.clone(),
1353 server_uri,
1354 endpoint_url,
1355 session_name,
1356 client_nonce,
1357 client_certificate,
1358 requested_session_timeout,
1359 max_response_message_size: 0,
1360 };
1361
1362 session_debug!(self, "CreateSessionRequest = {:?}", request);
1363
1364 let response = self.send_request(request)?;
1365 if let SupportedMessage::CreateSessionResponse(response) = response {
1366 process_service_result(&response.response_header)?;
1367
1368 let session_id = {
1369 let mut session_state = trace_write_lock!(self.session_state);
1370 session_state.set_session_id(response.session_id.clone());
1371 session_state.set_authentication_token(response.authentication_token.clone());
1372 {
1373 let mut secure_channel = trace_write_lock!(self.secure_channel);
1374 let _ =
1375 secure_channel.set_remote_nonce_from_byte_string(&response.server_nonce);
1376 let _ = secure_channel
1377 .set_remote_cert_from_byte_string(&response.server_certificate);
1378 }
1379 if self.ignore_clock_skew && !response.response_header.timestamp.is_null() {
1382 let offset = response.response_header.timestamp - DateTime::now();
1383 session_state.set_client_offset(offset);
1385 }
1386 session_state.session_id()
1387 };
1388
1389 let security_policy = self.security_policy();
1393 let cert_status_code = if security_policy != SecurityPolicy::None {
1394 if let Ok(server_certificate) =
1395 crypto::X509::from_byte_string(&response.server_certificate)
1396 {
1397 let hostname =
1399 hostname_from_url(self.session_info.endpoint.endpoint_url.as_ref())
1400 .map_err(|_| StatusCode::BadUnexpectedError)?;
1401 let application_uri =
1402 self.session_info.endpoint.server.application_uri.as_ref();
1403
1404 let certificate_store = trace_write_lock!(self.certificate_store);
1405 let result = certificate_store.validate_or_reject_application_instance_cert(
1406 &server_certificate,
1407 security_policy,
1408 Some(&hostname),
1409 Some(application_uri),
1410 );
1411 if result.is_bad() {
1412 result
1413 } else {
1414 StatusCode::Good
1415 }
1416 } else {
1417 session_error!(self, "Server did not supply a valid X509 certificate");
1418 StatusCode::BadCertificateInvalid
1419 }
1420 } else {
1421 StatusCode::Good
1422 };
1423
1424 if !cert_status_code.is_good() && !trust_any_server_cert() {
1425 session_error!(self, "Server's certificate was rejected");
1426 Err(cert_status_code)
1427 } else {
1428 session_debug!(
1431 self,
1432 "Revised session timeout is {}",
1433 response.revised_session_timeout
1434 );
1435 self.spawn_session_activity_task(response.revised_session_timeout);
1436 self.spawn_subscription_activity_task();
1437
1438 Ok(session_id)
1441 }
1442 } else {
1443 Err(process_unexpected_response(response))
1444 }
1445 }
1446
1447 fn activate_session(&self) -> Result<(), StatusCode> {
1448 let (user_identity_token, user_token_signature) = {
1449 let secure_channel = trace_read_lock!(self.secure_channel);
1450 self.user_identity_token(&secure_channel.remote_cert(), secure_channel.remote_nonce())?
1451 };
1452
1453 let locale_ids = if self.session_info.preferred_locales.is_empty() {
1454 None
1455 } else {
1456 let locale_ids = self
1457 .session_info
1458 .preferred_locales
1459 .iter()
1460 .map(UAString::from)
1461 .collect();
1462 Some(locale_ids)
1463 };
1464
1465 let security_policy = self.security_policy();
1466 let client_signature = match security_policy {
1467 SecurityPolicy::None => SignatureData::null(),
1468 _ => {
1469 let secure_channel = trace_read_lock!(self.secure_channel);
1470 let server_cert = secure_channel.remote_cert();
1471 let server_nonce = secure_channel.remote_nonce();
1472
1473 let (_, client_pkey) = {
1474 let certificate_store = trace_write_lock!(self.certificate_store);
1475 certificate_store.read_own_cert_and_pkey_optional()
1476 };
1477
1478 if client_pkey.is_none() {
1480 session_error!(self, "Cannot create client signature - no pkey!");
1481 return Err(StatusCode::BadUnexpectedError);
1482 } else if server_cert.is_none() {
1483 session_error!(
1484 self,
1485 "Cannot sign server certificate because server cert is null"
1486 );
1487 return Err(StatusCode::BadUnexpectedError);
1488 } else if server_nonce.is_empty() {
1489 session_error!(
1490 self,
1491 "Cannot sign server certificate because server nonce is empty"
1492 );
1493 return Err(StatusCode::BadUnexpectedError);
1494 }
1495
1496 let server_cert = secure_channel
1497 .remote_cert()
1498 .as_ref()
1499 .unwrap()
1500 .as_byte_string();
1501 let server_nonce = ByteString::from(secure_channel.remote_nonce());
1502 let signing_key = client_pkey.as_ref().unwrap();
1503 crypto::create_signature_data(
1504 signing_key,
1505 security_policy,
1506 &server_cert,
1507 &server_nonce,
1508 )?
1509 }
1510 };
1511
1512 let client_software_certificates = None;
1513
1514 let request = ActivateSessionRequest {
1515 request_header: self.make_request_header(),
1516 client_signature,
1517 client_software_certificates,
1518 locale_ids,
1519 user_identity_token,
1520 user_token_signature,
1521 };
1522
1523 let response = self.send_request(request)?;
1526 if let SupportedMessage::ActivateSessionResponse(response) = response {
1527 process_service_result(&response.response_header)?;
1529 Ok(())
1530 } else {
1531 Err(process_unexpected_response(response))
1532 }
1533 }
1534
1535 fn cancel(&self, request_handle: IntegerId) -> Result<u32, StatusCode> {
1536 let request = CancelRequest {
1537 request_header: self.make_request_header(),
1538 request_handle,
1539 };
1540 let response = self.send_request(request)?;
1541 if let SupportedMessage::CancelResponse(response) = response {
1542 process_service_result(&response.response_header)?;
1543 Ok(response.cancel_count)
1544 } else {
1545 Err(process_unexpected_response(response))
1546 }
1547 }
1548}
1549
1550impl SubscriptionService for Session {
1551 fn create_subscription<CB>(
1552 &self,
1553 publishing_interval: f64,
1554 lifetime_count: u32,
1555 max_keep_alive_count: u32,
1556 max_notifications_per_publish: u32,
1557 priority: u8,
1558 publishing_enabled: bool,
1559 callback: CB,
1560 ) -> Result<u32, StatusCode>
1561 where
1562 CB: OnSubscriptionNotification + Send + Sync + 'static,
1563 {
1564 self.create_subscription_inner(
1565 publishing_interval,
1566 lifetime_count,
1567 max_keep_alive_count,
1568 max_notifications_per_publish,
1569 priority,
1570 publishing_enabled,
1571 Arc::new(Mutex::new(callback)),
1572 )
1573 }
1574
1575 fn modify_subscription(
1576 &self,
1577 subscription_id: u32,
1578 publishing_interval: f64,
1579 lifetime_count: u32,
1580 max_keep_alive_count: u32,
1581 max_notifications_per_publish: u32,
1582 priority: u8,
1583 ) -> Result<(), StatusCode> {
1584 if subscription_id == 0 {
1585 session_error!(self, "modify_subscription, subscription id must be non-zero, or the subscription is considered invalid");
1586 Err(StatusCode::BadInvalidArgument)
1587 } else if !self.subscription_exists(subscription_id) {
1588 session_error!(self, "modify_subscription, subscription id does not exist");
1589 Err(StatusCode::BadInvalidArgument)
1590 } else {
1591 let request = ModifySubscriptionRequest {
1592 request_header: self.make_request_header(),
1593 subscription_id,
1594 requested_publishing_interval: publishing_interval,
1595 requested_lifetime_count: lifetime_count,
1596 requested_max_keep_alive_count: max_keep_alive_count,
1597 max_notifications_per_publish,
1598 priority,
1599 };
1600 let response = self.send_request(request)?;
1601 if let SupportedMessage::ModifySubscriptionResponse(response) = response {
1602 process_service_result(&response.response_header)?;
1603 let mut subscription_state = trace_write_lock!(self.subscription_state);
1604 subscription_state.modify_subscription(
1605 subscription_id,
1606 response.revised_publishing_interval,
1607 response.revised_lifetime_count,
1608 response.revised_max_keep_alive_count,
1609 max_notifications_per_publish,
1610 priority,
1611 );
1612 session_debug!(self, "modify_subscription success for {}", subscription_id);
1613 Ok(())
1614 } else {
1615 session_error!(self, "modify_subscription failed {:?}", response);
1616 Err(process_unexpected_response(response))
1617 }
1618 }
1619 }
1620
1621 fn set_publishing_mode(
1622 &self,
1623 subscription_ids: &[u32],
1624 publishing_enabled: bool,
1625 ) -> Result<Vec<StatusCode>, StatusCode> {
1626 session_debug!(
1627 self,
1628 "set_publishing_mode, for subscriptions {:?}, publishing enabled {}",
1629 subscription_ids,
1630 publishing_enabled
1631 );
1632 if subscription_ids.is_empty() {
1633 session_error!(
1635 self,
1636 "set_publishing_mode, no subscription ids were provided"
1637 );
1638 Err(StatusCode::BadNothingToDo)
1639 } else {
1640 let request = SetPublishingModeRequest {
1641 request_header: self.make_request_header(),
1642 publishing_enabled,
1643 subscription_ids: Some(subscription_ids.to_vec()),
1644 };
1645 let response = self.send_request(request)?;
1646 if let SupportedMessage::SetPublishingModeResponse(response) = response {
1647 process_service_result(&response.response_header)?;
1648 {
1649 let mut subscription_state = trace_write_lock!(self.subscription_state);
1651 subscription_state.set_publishing_mode(subscription_ids, publishing_enabled);
1652 }
1653 session_debug!(self, "set_publishing_mode success");
1654 Ok(response.results.unwrap())
1655 } else {
1656 session_error!(self, "set_publishing_mode failed {:?}", response);
1657 Err(process_unexpected_response(response))
1658 }
1659 }
1660 }
1661
1662 fn transfer_subscriptions(
1663 &self,
1664 subscription_ids: &[u32],
1665 send_initial_values: bool,
1666 ) -> Result<Vec<TransferResult>, StatusCode> {
1667 if subscription_ids.is_empty() {
1668 session_error!(
1670 self,
1671 "set_publishing_mode, no subscription ids were provided"
1672 );
1673 Err(StatusCode::BadNothingToDo)
1674 } else {
1675 let request = TransferSubscriptionsRequest {
1676 request_header: self.make_request_header(),
1677 subscription_ids: Some(subscription_ids.to_vec()),
1678 send_initial_values,
1679 };
1680 let response = self.send_request(request)?;
1681 if let SupportedMessage::TransferSubscriptionsResponse(response) = response {
1682 process_service_result(&response.response_header)?;
1683 session_debug!(self, "transfer_subscriptions success");
1684 Ok(response.results.unwrap())
1685 } else {
1686 session_error!(self, "transfer_subscriptions failed {:?}", response);
1687 Err(process_unexpected_response(response))
1688 }
1689 }
1690 }
1691
1692 fn delete_subscription(&self, subscription_id: u32) -> Result<StatusCode, StatusCode> {
1693 if subscription_id == 0 {
1694 session_error!(self, "delete_subscription, subscription id 0 is invalid");
1695 Err(StatusCode::BadInvalidArgument)
1696 } else if !self.subscription_exists(subscription_id) {
1697 session_error!(
1698 self,
1699 "delete_subscription, subscription id {} does not exist",
1700 subscription_id
1701 );
1702 Err(StatusCode::BadInvalidArgument)
1703 } else {
1704 let result = self.delete_subscriptions(&[subscription_id][..])?;
1705 Ok(result[0])
1706 }
1707 }
1708
1709 fn delete_subscriptions(
1710 &self,
1711 subscription_ids: &[u32],
1712 ) -> Result<Vec<StatusCode>, StatusCode> {
1713 if subscription_ids.is_empty() {
1714 session_trace!(self, "delete_subscriptions with no subscriptions");
1716 Err(StatusCode::BadNothingToDo)
1717 } else {
1718 let request = DeleteSubscriptionsRequest {
1720 request_header: self.make_request_header(),
1721 subscription_ids: Some(subscription_ids.to_vec()),
1722 };
1723 let response = self.send_request(request)?;
1724 if let SupportedMessage::DeleteSubscriptionsResponse(response) = response {
1725 process_service_result(&response.response_header)?;
1726 {
1727 let mut subscription_state = trace_write_lock!(self.subscription_state);
1729 subscription_ids.iter().for_each(|id| {
1730 let _ = subscription_state.delete_subscription(*id);
1731 });
1732 }
1733 session_debug!(self, "delete_subscriptions success");
1734 Ok(response.results.unwrap())
1735 } else {
1736 session_error!(self, "delete_subscriptions failed {:?}", response);
1737 Err(process_unexpected_response(response))
1738 }
1739 }
1740 }
1741}
1742
1743impl NodeManagementService for Session {
1744 fn add_nodes(&self, nodes_to_add: &[AddNodesItem]) -> Result<Vec<AddNodesResult>, StatusCode> {
1745 if nodes_to_add.is_empty() {
1746 session_error!(self, "add_nodes, called with no nodes to add");
1747 Err(StatusCode::BadNothingToDo)
1748 } else {
1749 let request = AddNodesRequest {
1750 request_header: self.make_request_header(),
1751 nodes_to_add: Some(nodes_to_add.to_vec()),
1752 };
1753 let response = self.send_request(request)?;
1754 if let SupportedMessage::AddNodesResponse(response) = response {
1755 Ok(response.results.unwrap())
1756 } else {
1757 Err(process_unexpected_response(response))
1758 }
1759 }
1760 }
1761
1762 fn add_references(
1763 &self,
1764 references_to_add: &[AddReferencesItem],
1765 ) -> Result<Vec<StatusCode>, StatusCode> {
1766 if references_to_add.is_empty() {
1767 session_error!(self, "add_references, called with no references to add");
1768 Err(StatusCode::BadNothingToDo)
1769 } else {
1770 let request = AddReferencesRequest {
1771 request_header: self.make_request_header(),
1772 references_to_add: Some(references_to_add.to_vec()),
1773 };
1774 let response = self.send_request(request)?;
1775 if let SupportedMessage::AddReferencesResponse(response) = response {
1776 Ok(response.results.unwrap())
1777 } else {
1778 Err(process_unexpected_response(response))
1779 }
1780 }
1781 }
1782
1783 fn delete_nodes(
1784 &self,
1785 nodes_to_delete: &[DeleteNodesItem],
1786 ) -> Result<Vec<StatusCode>, StatusCode> {
1787 if nodes_to_delete.is_empty() {
1788 session_error!(self, "delete_nodes, called with no nodes to delete");
1789 Err(StatusCode::BadNothingToDo)
1790 } else {
1791 let request = DeleteNodesRequest {
1792 request_header: self.make_request_header(),
1793 nodes_to_delete: Some(nodes_to_delete.to_vec()),
1794 };
1795 let response = self.send_request(request)?;
1796 if let SupportedMessage::DeleteNodesResponse(response) = response {
1797 Ok(response.results.unwrap())
1798 } else {
1799 Err(process_unexpected_response(response))
1800 }
1801 }
1802 }
1803
1804 fn delete_references(
1805 &self,
1806 references_to_delete: &[DeleteReferencesItem],
1807 ) -> Result<Vec<StatusCode>, StatusCode> {
1808 if references_to_delete.is_empty() {
1809 session_error!(
1810 self,
1811 "delete_references, called with no references to delete"
1812 );
1813 Err(StatusCode::BadNothingToDo)
1814 } else {
1815 let request = DeleteReferencesRequest {
1816 request_header: self.make_request_header(),
1817 references_to_delete: Some(references_to_delete.to_vec()),
1818 };
1819 let response = self.send_request(request)?;
1820 if let SupportedMessage::DeleteReferencesResponse(response) = response {
1821 Ok(response.results.unwrap())
1822 } else {
1823 Err(process_unexpected_response(response))
1824 }
1825 }
1826 }
1827}
1828
1829impl MonitoredItemService for Session {
1830 fn create_monitored_items(
1831 &self,
1832 subscription_id: u32,
1833 timestamps_to_return: TimestampsToReturn,
1834 items_to_create: &[MonitoredItemCreateRequest],
1835 ) -> Result<Vec<MonitoredItemCreateResult>, StatusCode> {
1836 session_debug!(
1837 self,
1838 "create_monitored_items, for subscription {}, {} items",
1839 subscription_id,
1840 items_to_create.len()
1841 );
1842 if subscription_id == 0 {
1843 session_error!(self, "create_monitored_items, subscription id 0 is invalid");
1844 Err(StatusCode::BadInvalidArgument)
1845 } else if !self.subscription_exists(subscription_id) {
1846 session_error!(
1847 self,
1848 "create_monitored_items, subscription id {} does not exist",
1849 subscription_id
1850 );
1851 Err(StatusCode::BadInvalidArgument)
1852 } else if items_to_create.is_empty() {
1853 session_error!(
1854 self,
1855 "create_monitored_items, called with no items to create"
1856 );
1857 Err(StatusCode::BadNothingToDo)
1858 } else {
1859 let mut items_to_create = items_to_create.to_vec();
1861 {
1862 let mut session_state = trace_write_lock!(self.session_state);
1863 items_to_create.iter_mut().for_each(|i| {
1864 if i.requested_parameters.client_handle == 0 {
1866 i.requested_parameters.client_handle =
1867 session_state.next_monitored_item_handle();
1868 }
1869 });
1870 }
1871
1872 let request = CreateMonitoredItemsRequest {
1873 request_header: self.make_request_header(),
1874 subscription_id,
1875 timestamps_to_return,
1876 items_to_create: Some(items_to_create.clone()),
1877 };
1878 let response = self.send_request(request)?;
1879 if let SupportedMessage::CreateMonitoredItemsResponse(response) = response {
1880 process_service_result(&response.response_header)?;
1881 if let Some(ref results) = response.results {
1882 session_debug!(
1883 self,
1884 "create_monitored_items, {} items created",
1885 items_to_create.len()
1886 );
1887 let items_to_create = items_to_create
1889 .iter()
1890 .zip(results)
1891 .map(|(i, r)| subscription::CreateMonitoredItem {
1892 id: r.monitored_item_id,
1893 client_handle: i.requested_parameters.client_handle,
1894 discard_oldest: i.requested_parameters.discard_oldest,
1895 item_to_monitor: i.item_to_monitor.clone(),
1896 monitoring_mode: i.monitoring_mode,
1897 queue_size: r.revised_queue_size,
1898 sampling_interval: r.revised_sampling_interval,
1899 })
1900 .collect::<Vec<subscription::CreateMonitoredItem>>();
1901 {
1902 let mut subscription_state = trace_write_lock!(self.subscription_state);
1903 subscription_state
1904 .insert_monitored_items(subscription_id, &items_to_create);
1905 }
1906 } else {
1907 session_debug!(
1908 self,
1909 "create_monitored_items, success but no monitored items were created"
1910 );
1911 }
1912 Ok(response.results.unwrap())
1913 } else {
1914 session_error!(self, "create_monitored_items failed {:?}", response);
1915 Err(process_unexpected_response(response))
1916 }
1917 }
1918 }
1919
1920 fn modify_monitored_items(
1921 &self,
1922 subscription_id: u32,
1923 timestamps_to_return: TimestampsToReturn,
1924 items_to_modify: &[MonitoredItemModifyRequest],
1925 ) -> Result<Vec<MonitoredItemModifyResult>, StatusCode> {
1926 session_debug!(
1927 self,
1928 "modify_monitored_items, for subscription {}, {} items",
1929 subscription_id,
1930 items_to_modify.len()
1931 );
1932 if subscription_id == 0 {
1933 session_error!(self, "modify_monitored_items, subscription id 0 is invalid");
1934 Err(StatusCode::BadInvalidArgument)
1935 } else if !self.subscription_exists(subscription_id) {
1936 session_error!(
1937 self,
1938 "modify_monitored_items, subscription id {} does not exist",
1939 subscription_id
1940 );
1941 Err(StatusCode::BadInvalidArgument)
1942 } else if items_to_modify.is_empty() {
1943 session_error!(
1944 self,
1945 "modify_monitored_items, called with no items to modify"
1946 );
1947 Err(StatusCode::BadNothingToDo)
1948 } else {
1949 let monitored_item_ids = items_to_modify
1950 .iter()
1951 .map(|i| i.monitored_item_id)
1952 .collect::<Vec<u32>>();
1953 let request = ModifyMonitoredItemsRequest {
1954 request_header: self.make_request_header(),
1955 subscription_id,
1956 timestamps_to_return,
1957 items_to_modify: Some(items_to_modify.to_vec()),
1958 };
1959 let response = self.send_request(request)?;
1960 if let SupportedMessage::ModifyMonitoredItemsResponse(response) = response {
1961 process_service_result(&response.response_header)?;
1962 if let Some(ref results) = response.results {
1963 let items_to_modify = monitored_item_ids
1965 .iter()
1966 .zip(results.iter())
1967 .map(|(id, r)| subscription::ModifyMonitoredItem {
1968 id: *id,
1969 queue_size: r.revised_queue_size,
1970 sampling_interval: r.revised_sampling_interval,
1971 })
1972 .collect::<Vec<subscription::ModifyMonitoredItem>>();
1973 {
1974 let mut subscription_state = trace_write_lock!(self.subscription_state);
1975 subscription_state
1976 .modify_monitored_items(subscription_id, &items_to_modify);
1977 }
1978 }
1979 session_debug!(self, "modify_monitored_items, success");
1980 Ok(response.results.unwrap())
1981 } else {
1982 session_error!(self, "modify_monitored_items failed {:?}", response);
1983 Err(process_unexpected_response(response))
1984 }
1985 }
1986 }
1987
1988 fn set_monitoring_mode(
1989 &self,
1990 subscription_id: u32,
1991 monitoring_mode: MonitoringMode,
1992 monitored_item_ids: &[u32],
1993 ) -> Result<Vec<StatusCode>, StatusCode> {
1994 if monitored_item_ids.is_empty() {
1995 session_error!(self, "set_monitoring_mode, called with nothing to do");
1996 Err(StatusCode::BadNothingToDo)
1997 } else {
1998 let request = {
1999 let monitored_item_ids = Some(monitored_item_ids.to_vec());
2000 SetMonitoringModeRequest {
2001 request_header: self.make_request_header(),
2002 subscription_id,
2003 monitoring_mode,
2004 monitored_item_ids,
2005 }
2006 };
2007 let response = self.send_request(request)?;
2008 if let SupportedMessage::SetMonitoringModeResponse(response) = response {
2009 Ok(response.results.unwrap())
2010 } else {
2011 session_error!(self, "set_monitoring_mode failed {:?}", response);
2012 Err(process_unexpected_response(response))
2013 }
2014 }
2015 }
2016
2017 fn set_triggering(
2018 &self,
2019 subscription_id: u32,
2020 triggering_item_id: u32,
2021 links_to_add: &[u32],
2022 links_to_remove: &[u32],
2023 ) -> Result<(Option<Vec<StatusCode>>, Option<Vec<StatusCode>>), StatusCode> {
2024 if links_to_add.is_empty() && links_to_remove.is_empty() {
2025 session_error!(self, "set_triggering, called with nothing to add or remove");
2026 Err(StatusCode::BadNothingToDo)
2027 } else {
2028 let request = {
2029 let links_to_add = if links_to_add.is_empty() {
2030 None
2031 } else {
2032 Some(links_to_add.to_vec())
2033 };
2034 let links_to_remove = if links_to_remove.is_empty() {
2035 None
2036 } else {
2037 Some(links_to_remove.to_vec())
2038 };
2039 SetTriggeringRequest {
2040 request_header: self.make_request_header(),
2041 subscription_id,
2042 triggering_item_id,
2043 links_to_add,
2044 links_to_remove,
2045 }
2046 };
2047 let response = self.send_request(request)?;
2048 if let SupportedMessage::SetTriggeringResponse(response) = response {
2049 let mut subscription_state = trace_write_lock!(self.subscription_state);
2051 subscription_state.set_triggering(
2052 subscription_id,
2053 triggering_item_id,
2054 links_to_add,
2055 links_to_remove,
2056 );
2057 Ok((response.add_results, response.remove_results))
2058 } else {
2059 session_error!(self, "set_triggering failed {:?}", response);
2060 Err(process_unexpected_response(response))
2061 }
2062 }
2063 }
2064
2065 fn delete_monitored_items(
2066 &self,
2067 subscription_id: u32,
2068 items_to_delete: &[u32],
2069 ) -> Result<Vec<StatusCode>, StatusCode> {
2070 session_debug!(
2071 self,
2072 "delete_monitored_items, subscription {} for {} items",
2073 subscription_id,
2074 items_to_delete.len()
2075 );
2076 if subscription_id == 0 {
2077 session_error!(self, "delete_monitored_items, subscription id 0 is invalid");
2078 Err(StatusCode::BadInvalidArgument)
2079 } else if !self.subscription_exists(subscription_id) {
2080 session_error!(
2081 self,
2082 "delete_monitored_items, subscription id {} does not exist",
2083 subscription_id
2084 );
2085 Err(StatusCode::BadInvalidArgument)
2086 } else if items_to_delete.is_empty() {
2087 session_error!(
2088 self,
2089 "delete_monitored_items, called with no items to delete"
2090 );
2091 Err(StatusCode::BadNothingToDo)
2092 } else {
2093 let request = DeleteMonitoredItemsRequest {
2094 request_header: self.make_request_header(),
2095 subscription_id,
2096 monitored_item_ids: Some(items_to_delete.to_vec()),
2097 };
2098 let response = self.send_request(request)?;
2099 if let SupportedMessage::DeleteMonitoredItemsResponse(response) = response {
2100 process_service_result(&response.response_header)?;
2101 if response.results.is_some() {
2102 let mut subscription_state = trace_write_lock!(self.subscription_state);
2103 subscription_state.delete_monitored_items(subscription_id, items_to_delete);
2104 }
2105 session_debug!(self, "delete_monitored_items, success");
2106 Ok(response.results.unwrap())
2107 } else {
2108 session_error!(self, "delete_monitored_items failed {:?}", response);
2109 Err(process_unexpected_response(response))
2110 }
2111 }
2112 }
2113}
2114
2115impl ViewService for Session {
2116 fn browse(
2117 &self,
2118 nodes_to_browse: &[BrowseDescription],
2119 ) -> Result<Option<Vec<BrowseResult>>, StatusCode> {
2120 if nodes_to_browse.is_empty() {
2121 session_error!(self, "browse, was not supplied with any nodes to browse");
2122 Err(StatusCode::BadNothingToDo)
2123 } else {
2124 let request = BrowseRequest {
2125 request_header: self.make_request_header(),
2126 view: ViewDescription {
2127 view_id: NodeId::null(),
2128 timestamp: DateTime::null(),
2129 view_version: 0,
2130 },
2131 requested_max_references_per_node: 1000,
2132 nodes_to_browse: Some(nodes_to_browse.to_vec()),
2133 };
2134 let response = self.send_request(request)?;
2135 if let SupportedMessage::BrowseResponse(response) = response {
2136 session_debug!(self, "browse, success");
2137 process_service_result(&response.response_header)?;
2138 Ok(response.results)
2139 } else {
2140 session_error!(self, "browse failed {:?}", response);
2141 Err(process_unexpected_response(response))
2142 }
2143 }
2144 }
2145
2146 fn browse_next(
2147 &self,
2148 release_continuation_points: bool,
2149 continuation_points: &[ByteString],
2150 ) -> Result<Option<Vec<BrowseResult>>, StatusCode> {
2151 if continuation_points.is_empty() {
2152 Err(StatusCode::BadNothingToDo)
2153 } else {
2154 let request = BrowseNextRequest {
2155 request_header: self.make_request_header(),
2156 continuation_points: Some(continuation_points.to_vec()),
2157 release_continuation_points,
2158 };
2159 let response = self.send_request(request)?;
2160 if let SupportedMessage::BrowseNextResponse(response) = response {
2161 session_debug!(self, "browse_next, success");
2162 process_service_result(&response.response_header)?;
2163 Ok(response.results)
2164 } else {
2165 session_error!(self, "browse_next failed {:?}", response);
2166 Err(process_unexpected_response(response))
2167 }
2168 }
2169 }
2170
2171 fn translate_browse_paths_to_node_ids(
2172 &self,
2173 browse_paths: &[BrowsePath],
2174 ) -> Result<Vec<BrowsePathResult>, StatusCode> {
2175 if browse_paths.is_empty() {
2176 session_error!(
2177 self,
2178 "translate_browse_paths_to_node_ids, was not supplied with any browse paths"
2179 );
2180 Err(StatusCode::BadNothingToDo)
2181 } else {
2182 let request = TranslateBrowsePathsToNodeIdsRequest {
2183 request_header: self.make_request_header(),
2184 browse_paths: Some(browse_paths.to_vec()),
2185 };
2186 let response = self.send_request(request)?;
2187 if let SupportedMessage::TranslateBrowsePathsToNodeIdsResponse(response) = response {
2188 session_debug!(self, "translate_browse_paths_to_node_ids, success");
2189 process_service_result(&response.response_header)?;
2190 Ok(response.results.unwrap_or_default())
2191 } else {
2192 session_error!(
2193 self,
2194 "translate_browse_paths_to_node_ids failed {:?}",
2195 response
2196 );
2197 Err(process_unexpected_response(response))
2198 }
2199 }
2200 }
2201
2202 fn register_nodes(&self, nodes_to_register: &[NodeId]) -> Result<Vec<NodeId>, StatusCode> {
2203 if nodes_to_register.is_empty() {
2204 session_error!(
2205 self,
2206 "register_nodes, was not supplied with any nodes to register"
2207 );
2208 Err(StatusCode::BadNothingToDo)
2209 } else {
2210 let request = RegisterNodesRequest {
2211 request_header: self.make_request_header(),
2212 nodes_to_register: Some(nodes_to_register.to_vec()),
2213 };
2214 let response = self.send_request(request)?;
2215 if let SupportedMessage::RegisterNodesResponse(response) = response {
2216 session_debug!(self, "register_nodes, success");
2217 process_service_result(&response.response_header)?;
2218 Ok(response.registered_node_ids.unwrap())
2219 } else {
2220 session_error!(self, "register_nodes failed {:?}", response);
2221 Err(process_unexpected_response(response))
2222 }
2223 }
2224 }
2225
2226 fn unregister_nodes(&self, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode> {
2227 if nodes_to_unregister.is_empty() {
2228 session_error!(
2229 self,
2230 "unregister_nodes, was not supplied with any nodes to unregister"
2231 );
2232 Err(StatusCode::BadNothingToDo)
2233 } else {
2234 let request = UnregisterNodesRequest {
2235 request_header: self.make_request_header(),
2236 nodes_to_unregister: Some(nodes_to_unregister.to_vec()),
2237 };
2238 let response = self.send_request(request)?;
2239 if let SupportedMessage::UnregisterNodesResponse(response) = response {
2240 session_debug!(self, "unregister_nodes, success");
2241 process_service_result(&response.response_header)?;
2242 Ok(())
2243 } else {
2244 session_error!(self, "unregister_nodes failed {:?}", response);
2245 Err(process_unexpected_response(response))
2246 }
2247 }
2248 }
2249}
2250
2251impl MethodService for Session {
2252 fn call<T>(&self, method: T) -> Result<CallMethodResult, StatusCode>
2253 where
2254 T: Into<CallMethodRequest>,
2255 {
2256 session_debug!(self, "call()");
2257 let methods_to_call = Some(vec![method.into()]);
2258 let request = CallRequest {
2259 request_header: self.make_request_header(),
2260 methods_to_call,
2261 };
2262 let response = self.send_request(request)?;
2263 if let SupportedMessage::CallResponse(response) = response {
2264 if let Some(mut results) = response.results {
2265 if results.len() != 1 {
2266 session_error!(
2267 self,
2268 "call(), expecting a result from the call to the server, got {} results",
2269 results.len()
2270 );
2271 Err(StatusCode::BadUnexpectedError)
2272 } else {
2273 Ok(results.remove(0))
2274 }
2275 } else {
2276 session_error!(
2277 self,
2278 "call(), expecting a result from the call to the server, got nothing"
2279 );
2280 Err(StatusCode::BadUnexpectedError)
2281 }
2282 } else {
2283 Err(process_unexpected_response(response))
2284 }
2285 }
2286}
2287
2288impl AttributeService for Session {
2289 fn read(
2290 &self,
2291 nodes_to_read: &[ReadValueId],
2292 timestamps_to_return: TimestampsToReturn,
2293 max_age: f64,
2294 ) -> Result<Vec<DataValue>, StatusCode> {
2295 if nodes_to_read.is_empty() {
2296 session_error!(self, "read(), was not supplied with any nodes to read");
2298 Err(StatusCode::BadNothingToDo)
2299 } else {
2300 session_debug!(self, "read() requested to read nodes {:?}", nodes_to_read);
2301 let request = ReadRequest {
2302 request_header: self.make_request_header(),
2303 max_age,
2304 timestamps_to_return,
2305 nodes_to_read: Some(nodes_to_read.to_vec()),
2306 };
2307 let response = self.send_request(request)?;
2308 if let SupportedMessage::ReadResponse(response) = response {
2309 session_debug!(self, "read(), success");
2310 process_service_result(&response.response_header)?;
2311 let results = if let Some(results) = response.results {
2312 results
2313 } else {
2314 Vec::new()
2315 };
2316 Ok(results)
2317 } else {
2318 session_error!(self, "read() value failed");
2319 Err(process_unexpected_response(response))
2320 }
2321 }
2322 }
2323
2324 fn history_read(
2325 &self,
2326 history_read_details: HistoryReadAction,
2327 timestamps_to_return: TimestampsToReturn,
2328 release_continuation_points: bool,
2329 nodes_to_read: &[HistoryReadValueId],
2330 ) -> Result<Vec<HistoryReadResult>, StatusCode> {
2331 let history_read_details = ExtensionObject::from(history_read_details);
2333 let request = HistoryReadRequest {
2334 request_header: self.make_request_header(),
2335 history_read_details,
2336 timestamps_to_return,
2337 release_continuation_points,
2338 nodes_to_read: if nodes_to_read.is_empty() {
2339 None
2340 } else {
2341 Some(nodes_to_read.to_vec())
2342 },
2343 };
2344 session_debug!(
2345 self,
2346 "history_read() requested to read nodes {:?}",
2347 nodes_to_read
2348 );
2349 let response = self.send_request(request)?;
2350 if let SupportedMessage::HistoryReadResponse(response) = response {
2351 session_debug!(self, "history_read(), success");
2352 process_service_result(&response.response_header)?;
2353 let results = if let Some(results) = response.results {
2354 results
2355 } else {
2356 Vec::new()
2357 };
2358 Ok(results)
2359 } else {
2360 session_error!(self, "history_read() value failed");
2361 Err(process_unexpected_response(response))
2362 }
2363 }
2364
2365 fn write(&self, nodes_to_write: &[WriteValue]) -> Result<Vec<StatusCode>, StatusCode> {
2366 if nodes_to_write.is_empty() {
2367 session_error!(self, "write() was not supplied with any nodes to write");
2369 Err(StatusCode::BadNothingToDo)
2370 } else {
2371 let request = WriteRequest {
2372 request_header: self.make_request_header(),
2373 nodes_to_write: Some(nodes_to_write.to_vec()),
2374 };
2375 let response = self.send_request(request)?;
2376 if let SupportedMessage::WriteResponse(response) = response {
2377 session_debug!(self, "write(), success");
2378 process_service_result(&response.response_header)?;
2379 Ok(response.results.unwrap_or_default())
2380 } else {
2381 session_error!(self, "write() failed {:?}", response);
2382 Err(process_unexpected_response(response))
2383 }
2384 }
2385 }
2386
2387 fn history_update(
2388 &self,
2389 history_update_details: &[HistoryUpdateAction],
2390 ) -> Result<Vec<HistoryUpdateResult>, StatusCode> {
2391 if history_update_details.is_empty() {
2392 session_error!(
2394 self,
2395 "history_update(), was not supplied with any detail to update"
2396 );
2397 Err(StatusCode::BadNothingToDo)
2398 } else {
2399 let history_update_details = history_update_details
2401 .iter()
2402 .map(|action| ExtensionObject::from(action))
2403 .collect::<Vec<ExtensionObject>>();
2404
2405 let request = HistoryUpdateRequest {
2406 request_header: self.make_request_header(),
2407 history_update_details: Some(history_update_details.to_vec()),
2408 };
2409 let response = self.send_request(request)?;
2410 if let SupportedMessage::HistoryUpdateResponse(response) = response {
2411 session_debug!(self, "history_update(), success");
2412 process_service_result(&response.response_header)?;
2413 let results = if let Some(results) = response.results {
2414 results
2415 } else {
2416 Vec::new()
2417 };
2418 Ok(results)
2419 } else {
2420 session_error!(self, "history_update() failed {:?}", response);
2421 Err(process_unexpected_response(response))
2422 }
2423 }
2424 }
2425}