1use std::collections::HashMap;
2use std::future::Future;
3use std::io;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll};
8use std::time::{Duration, Instant};
9
10use bytes::{Buf, Bytes, BytesMut};
11use log::{debug, trace, warn};
12use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, ReadHalf, WriteHalf};
13use tokio_stream::StreamExt;
14use tokio_util::codec::{Encoder, FramedRead};
15
16use crate::codec::{DecodedFrame, FramingMode, NetconfCodec, extract_message_id_from_bytes};
17use crate::config::Config;
18use crate::error::TransportError;
19use crate::hello::ServerHello;
20use crate::message::{self, DataPayload, RpcReply, RpcReplyBody, ServerMessage};
21use crate::stream::NetconfStream;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum SessionState {
26 Ready = 0,
28 Closing = 1,
30 Closed = 2,
32}
33
34impl SessionState {
35 fn from_u8(v: u8) -> Self {
36 match v {
37 0 => Self::Ready,
38 1 => Self::Closing,
39 _ => Self::Closed,
40 }
41 }
42}
43
44impl std::fmt::Display for SessionState {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::Ready => write!(f, "Ready"),
48 Self::Closing => write!(f, "Closing"),
49 Self::Closed => write!(f, "Closed"),
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
59pub enum DisconnectReason {
60 Eof,
62 TransportError(String),
66 Dropped,
68}
69
70impl std::fmt::Display for DisconnectReason {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 Self::Eof => write!(f, "connection closed by remote"),
74 Self::TransportError(e) => write!(f, "transport error: {e}"),
75 Self::Dropped => write!(f, "session dropped"),
76 }
77 }
78}
79
80#[derive(Debug, Clone, Copy)]
81pub enum Datastore {
82 Running,
83 Candidate,
84 Startup,
85}
86
87impl Datastore {
88 fn as_xml(&self) -> &'static str {
89 match self {
90 Datastore::Running => "<running/>",
91 Datastore::Candidate => "<candidate/>",
92 Datastore::Startup => "<startup/>",
93 }
94 }
95}
96
97enum PendingRpc {
107 Normal(tokio::sync::oneshot::Sender<crate::Result<RpcReply>>),
108 Stream(tokio::sync::mpsc::Sender<crate::Result<Bytes>>),
109}
110
111pub struct RpcFuture {
116 rx: tokio::sync::oneshot::Receiver<crate::Result<RpcReply>>,
117 msg_id: u32,
118 rpc_timeout: Option<Duration>,
119}
120
121impl RpcFuture {
122 pub fn message_id(&self) -> u32 {
124 self.msg_id
125 }
126
127 pub async fn response(self) -> crate::Result<RpcReply> {
133 let result = match self.rpc_timeout {
134 Some(duration) => tokio::time::timeout(duration, self.rx)
135 .await
136 .map_err(|_| crate::Error::Transport(TransportError::Timeout(duration)))?,
137 None => self.rx.await,
138 };
139 result.map_err(|_| crate::Error::SessionClosed)?
140 }
141
142 pub async fn response_with_timeout(self, timeout: Duration) -> crate::Result<RpcReply> {
148 let result = tokio::time::timeout(timeout, self.rx)
149 .await
150 .map_err(|_| crate::Error::Transport(TransportError::Timeout(timeout)))?;
151 result.map_err(|_| crate::Error::SessionClosed)?
152 }
153}
154
155pub struct RpcStream {
184 rx: tokio::sync::mpsc::Receiver<crate::Result<Bytes>>,
185 current: Bytes,
187 msg_id: u32,
188 done: bool,
189}
190
191impl RpcStream {
192 pub fn message_id(&self) -> u32 {
194 self.msg_id
195 }
196
197 pub fn is_done(&self) -> bool {
199 self.done
200 }
201}
202
203impl AsyncRead for RpcStream {
204 fn poll_read(
205 mut self: Pin<&mut Self>,
206 cx: &mut Context<'_>,
207 buf: &mut ReadBuf<'_>,
208 ) -> Poll<io::Result<()>> {
209 if !self.current.is_empty() {
211 let n = std::cmp::min(buf.remaining(), self.current.len());
212 buf.put_slice(&self.current[..n]);
213 self.current.advance(n);
214 return Poll::Ready(Ok(()));
215 }
216
217 if self.done {
219 return Poll::Ready(Ok(()));
220 }
221
222 match self.rx.poll_recv(cx) {
224 Poll::Ready(Some(Ok(chunk))) => {
225 let n = std::cmp::min(buf.remaining(), chunk.len());
226 buf.put_slice(&chunk[..n]);
227 if n < chunk.len() {
228 self.current = chunk.slice(n..);
229 }
230 Poll::Ready(Ok(()))
231 }
232 Poll::Ready(Some(Err(e))) => {
233 self.done = true;
234 Poll::Ready(Err(io::Error::other(e.to_string())))
235 }
236 Poll::Ready(None) => {
237 self.done = true;
239 Poll::Ready(Ok(()))
240 }
241 Poll::Pending => Poll::Pending,
242 }
243 }
244}
245
246struct SessionInner {
251 pending: Mutex<HashMap<u32, PendingRpc>>,
253 state: AtomicU8,
264 disconnect_tx: tokio::sync::watch::Sender<Option<DisconnectReason>>,
268 created_at: Instant,
270 last_rpc_nanos: AtomicU64,
273 active_streams: AtomicUsize,
276}
277
278impl SessionInner {
279 fn state(&self) -> SessionState {
280 SessionState::from_u8(self.state.load(Ordering::Acquire))
281 }
282
283 fn set_state(&self, state: SessionState) {
284 self.state.store(state as u8, Ordering::Release);
285 }
286
287 fn drain_pending(&self) -> usize {
288 let mut pending = self.pending.lock().unwrap();
289 let count = pending.len();
290 for (_, rpc) in pending.drain() {
291 match rpc {
292 PendingRpc::Normal(tx) => {
293 let _ = tx.send(Err(crate::Error::SessionClosed));
294 }
295 PendingRpc::Stream(tx) => {
296 let _ = tx.try_send(Err(crate::Error::SessionClosed));
297 }
298 }
299 }
300 count
301 }
302}
303
304struct WriterState {
307 writer: WriteHalf<NetconfStream>,
308 codec: NetconfCodec,
309}
310
311pub struct Session {
326 writer: tokio::sync::Mutex<WriterState>,
329
330 inner: Arc<SessionInner>,
333
334 server_hello: ServerHello,
336
337 framing: FramingMode,
339
340 rpc_timeout: Option<Duration>,
342
343 stream_buffer_capacity: usize,
345
346 disconnect_rx: tokio::sync::watch::Receiver<Option<DisconnectReason>>,
349
350 connected_since: Instant,
352
353 _reader_handle: tokio::task::JoinHandle<()>,
358
359 _keep_alive: Option<Box<dyn std::any::Any + Send + Sync>>,
362}
363
364impl Drop for Session {
365 fn drop(&mut self) {
366 let drained = self.inner.drain_pending();
368 if drained > 0 {
369 debug!(
370 "session {}: drop: drained {drained} pending RPCs",
371 self.server_hello.session_id
372 );
373 }
374 self.inner.set_state(SessionState::Closed);
376 self.inner.disconnect_tx.send_if_modified(|current| {
378 if current.is_none() {
379 *current = Some(DisconnectReason::Dropped);
380 true
381 } else {
382 false
383 }
384 });
385 self._reader_handle.abort();
387 }
388}
389
390impl Session {
391 pub async fn connect(
393 host: &str,
394 port: u16,
395 username: &str,
396 password: &str,
397 ) -> crate::Result<Self> {
398 Self::connect_with_config(host, port, username, password, Config::default()).await
399 }
400
401 pub async fn connect_with_config(
403 host: &str,
404 port: u16,
405 username: &str,
406 password: &str,
407 config: Config,
408 ) -> crate::Result<Self> {
409 let (mut stream, keep_alive) =
410 crate::transport::connect(host, port, username, password, &config).await?;
411 let (server_hello, framing) = exchange_hello(&mut stream, &config).await?;
412 Self::build(stream, Some(keep_alive), server_hello, framing, config)
413 }
414
415 pub async fn from_stream<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
417 stream: S,
418 ) -> crate::Result<Self> {
419 Self::from_stream_with_config(stream, Config::default()).await
420 }
421
422 pub async fn from_stream_with_config<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
424 mut stream: S,
425 config: Config,
426 ) -> crate::Result<Self> {
427 let (server_hello, framing) = exchange_hello(&mut stream, &config).await?;
428 let boxed: NetconfStream = Box::new(stream);
429 Self::build(boxed, None, server_hello, framing, config)
430 }
431
432 fn build(
433 stream: NetconfStream,
434 keep_alive: Option<Box<dyn std::any::Any + Send + Sync>>,
435 server_hello: ServerHello,
436 framing: FramingMode,
437 config: Config,
438 ) -> crate::Result<Self> {
439 debug!(
440 "session {}: building (framing={:?}, capabilities={})",
441 server_hello.session_id,
442 framing,
443 server_hello.capabilities.len()
444 );
445 let (read_half, write_half) = tokio::io::split(stream);
446
447 let read_codec = NetconfCodec::new(framing, config.codec);
448 let write_codec = NetconfCodec::new(framing, config.codec);
449 let reader = FramedRead::new(read_half, read_codec);
450
451 let (disconnect_tx, disconnect_rx) = tokio::sync::watch::channel(None);
452
453 let inner = Arc::new(SessionInner {
454 pending: Mutex::new(HashMap::new()),
455 state: AtomicU8::new(SessionState::Ready as u8),
456 disconnect_tx,
457 created_at: Instant::now(),
458 last_rpc_nanos: AtomicU64::new(0),
459 active_streams: AtomicUsize::new(0),
460 });
461
462 let reader_inner = Arc::clone(&inner);
463 let session_id = server_hello.session_id;
464 let reader_handle = tokio::spawn(async move {
465 reader_loop(reader, reader_inner, session_id).await;
466 });
467
468 Ok(Self {
469 writer: tokio::sync::Mutex::new(WriterState {
470 writer: write_half,
471 codec: write_codec,
472 }),
473 inner,
474 server_hello,
475 framing,
476 rpc_timeout: config.rpc_timeout,
477 stream_buffer_capacity: config.stream_buffer_capacity,
478 disconnect_rx,
479 connected_since: Instant::now(),
480 _reader_handle: reader_handle,
481 _keep_alive: keep_alive,
482 })
483 }
484
485 pub fn session_id(&self) -> u32 {
486 self.server_hello.session_id
487 }
488
489 pub fn server_capabilities(&self) -> &[String] {
490 &self.server_hello.capabilities
491 }
492
493 pub fn framing_mode(&self) -> FramingMode {
494 self.framing
495 }
496
497 pub fn state(&self) -> SessionState {
498 self.inner.state()
499 }
500
501 pub fn disconnected(&self) -> impl Future<Output = DisconnectReason> + Send + 'static {
518 let mut rx = self.disconnect_rx.clone();
519 async move {
520 if let Some(reason) = rx.borrow_and_update().clone() {
522 return reason;
523 }
524 loop {
527 if rx.changed().await.is_err() {
528 return DisconnectReason::Dropped;
529 }
530 if let Some(reason) = rx.borrow_and_update().clone() {
531 return reason;
532 }
533 }
534 }
535 }
536
537 fn check_state(&self) -> crate::Result<()> {
538 let state = self.inner.state();
539 if state != SessionState::Ready {
540 return Err(crate::Error::InvalidState(state.to_string()));
541 }
542 Ok(())
543 }
544
545 async fn send_encoded(&self, xml: &str) -> crate::Result<()> {
548 let mut buf = BytesMut::new();
549 let mut state = self.writer.lock().await;
550 state.codec.encode(Bytes::from(xml.to_string()), &mut buf)?;
551 trace!(
552 "session {}: writing {} bytes to stream",
553 self.server_hello.session_id,
554 buf.len()
555 );
556 state.writer.write_all(&buf).await?;
557 state.writer.flush().await?;
558 Ok(())
559 }
560
561 pub async fn rpc_send(&self, inner_xml: &str) -> crate::Result<RpcFuture> {
568 self.check_state()?;
569 let (msg_id, xml) = message::build_rpc(inner_xml);
570 debug!(
571 "session {}: sending rpc message-id={} ({} bytes)",
572 self.server_hello.session_id,
573 msg_id,
574 xml.len()
575 );
576 trace!(
577 "session {}: rpc content: {}",
578 self.server_hello.session_id, inner_xml
579 );
580 let (tx, rx) = tokio::sync::oneshot::channel();
581
582 self.inner
583 .pending
584 .lock()
585 .unwrap()
586 .insert(msg_id, PendingRpc::Normal(tx));
587
588 if let Err(e) = self.send_encoded(&xml).await {
589 debug!(
590 "session {}: send failed for message-id={}: {}",
591 self.server_hello.session_id, msg_id, e
592 );
593 self.inner.pending.lock().unwrap().remove(&msg_id);
594 return Err(e);
595 }
596 Ok(RpcFuture {
597 rx,
598 msg_id,
599 rpc_timeout: self.rpc_timeout,
600 })
601 }
602
603 pub async fn rpc_raw(&self, inner_xml: &str) -> crate::Result<RpcReply> {
605 let future = self.rpc_send(inner_xml).await?;
606 future.response().await
607 }
608
609 pub async fn rpc_stream(&self, inner_xml: &str) -> crate::Result<RpcStream> {
645 self.check_state()?;
646 let (msg_id, xml) = message::build_rpc(inner_xml);
647 debug!(
648 "session {}: sending streaming rpc message-id={} ({} bytes)",
649 self.server_hello.session_id,
650 msg_id,
651 xml.len()
652 );
653
654 let (tx, rx) = tokio::sync::mpsc::channel(self.stream_buffer_capacity);
655
656 self.inner
657 .pending
658 .lock()
659 .unwrap()
660 .insert(msg_id, PendingRpc::Stream(tx));
661
662 if let Err(e) = self.send_encoded(&xml).await {
663 debug!(
664 "session {}: send failed for streaming message-id={}: {}",
665 self.server_hello.session_id, msg_id, e
666 );
667 self.inner.pending.lock().unwrap().remove(&msg_id);
668 return Err(e);
669 }
670
671 Ok(RpcStream {
672 rx,
673 current: Bytes::new(),
674 msg_id,
675 done: false,
676 })
677 }
678
679 async fn rpc_send_unchecked(&self, inner_xml: &str) -> crate::Result<RpcFuture> {
681 let (msg_id, xml) = message::build_rpc(inner_xml);
682 let (tx, rx) = tokio::sync::oneshot::channel();
683
684 self.inner
685 .pending
686 .lock()
687 .unwrap()
688 .insert(msg_id, PendingRpc::Normal(tx));
689
690 if let Err(e) = self.send_encoded(&xml).await {
691 self.inner.pending.lock().unwrap().remove(&msg_id);
692 return Err(e);
693 }
694
695 Ok(RpcFuture {
696 rx,
697 msg_id,
698 rpc_timeout: self.rpc_timeout,
699 })
700 }
701
702 pub async fn get_config(
704 &self,
705 source: Datastore,
706 filter: Option<&str>,
707 ) -> crate::Result<String> {
708 let filter_xml = match filter {
709 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
710 None => String::new(),
711 };
712 let inner = format!(
713 "<get-config><source>{}</source>{filter_xml}</get-config>",
714 source.as_xml()
715 );
716 let reply = self.rpc_raw(&inner).await?;
717 reply_to_data(reply)
718 }
719
720 pub async fn get_config_payload(
726 &self,
727 source: Datastore,
728 filter: Option<&str>,
729 ) -> crate::Result<DataPayload> {
730 let filter_xml = match filter {
731 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
732 None => String::new(),
733 };
734 let inner = format!(
735 "<get-config><source>{}</source>{filter_xml}</get-config>",
736 source.as_xml()
737 );
738 let reply = self.rpc_raw(&inner).await?;
739 reply.into_data()
740 }
741
742 pub async fn get_config_stream(
747 &self,
748 source: Datastore,
749 filter: Option<&str>,
750 ) -> crate::Result<RpcStream> {
751 let filter_xml = match filter {
752 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
753 None => String::new(),
754 };
755 let inner = format!(
756 "<get-config><source>{}</source>{filter_xml}</get-config>",
757 source.as_xml()
758 );
759 self.rpc_stream(&inner).await
760 }
761
762 pub async fn get(&self, filter: Option<&str>) -> crate::Result<String> {
764 let filter_xml = match filter {
765 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
766 None => String::new(),
767 };
768 let inner = format!("<get>{filter_xml}</get>");
769 let reply = self.rpc_raw(&inner).await?;
770 reply_to_data(reply)
771 }
772
773 pub async fn get_payload(&self, filter: Option<&str>) -> crate::Result<DataPayload> {
778 let filter_xml = match filter {
779 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
780 None => String::new(),
781 };
782 let inner = format!("<get>{filter_xml}</get>");
783 let reply = self.rpc_raw(&inner).await?;
784 reply.into_data()
785 }
786
787 pub async fn get_stream(&self, filter: Option<&str>) -> crate::Result<RpcStream> {
792 let filter_xml = match filter {
793 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
794 None => String::new(),
795 };
796 let inner = format!("<get>{filter_xml}</get>");
797 self.rpc_stream(&inner).await
798 }
799
800 pub async fn edit_config(&self, target: Datastore, config: &str) -> crate::Result<()> {
802 let inner = format!(
803 "<edit-config><target>{}</target><config>{config}</config></edit-config>",
804 target.as_xml()
805 );
806 let reply = self.rpc_raw(&inner).await?;
807 reply_to_ok(reply)
808 }
809
810 pub async fn lock(&self, target: Datastore) -> crate::Result<()> {
812 let inner = format!("<lock><target>{}</target></lock>", target.as_xml());
813 let reply = self.rpc_raw(&inner).await?;
814 reply_to_ok(reply)
815 }
816
817 pub async fn unlock(&self, target: Datastore) -> crate::Result<()> {
819 let inner = format!("<unlock><target>{}</target></unlock>", target.as_xml());
820 let reply = self.rpc_raw(&inner).await?;
821 reply_to_ok(reply)
822 }
823
824 pub async fn commit(&self) -> crate::Result<()> {
826 let reply = self.rpc_raw("<commit/>").await?;
827 reply_to_ok(reply)
828 }
829
830 pub async fn close_session(&self) -> crate::Result<()> {
832 let prev = self.inner.state.compare_exchange(
835 SessionState::Ready as u8,
836 SessionState::Closing as u8,
837 Ordering::AcqRel,
838 Ordering::Acquire,
839 );
840 if let Err(current) = prev {
841 let state = SessionState::from_u8(current);
842 return Err(crate::Error::InvalidState(state.to_string()));
843 }
844 debug!("session {}: closing", self.server_hello.session_id);
845 let result = self.rpc_send_unchecked("<close-session/>").await;
846 match result {
847 Ok(future) => {
848 let reply = future.response().await;
849 self.inner.set_state(SessionState::Closed);
850 debug!(
851 "session {}: closed gracefully",
852 self.server_hello.session_id
853 );
854 reply_to_ok(reply?)
855 }
856 Err(e) => {
857 self.inner.set_state(SessionState::Closed);
858 debug!(
859 "session {}: close failed: {}",
860 self.server_hello.session_id, e
861 );
862 Err(e)
863 }
864 }
865 }
866
867 pub async fn close(self) -> crate::Result<()> {
874 let result = self.close_session().await;
875 self.writer.lock().await.writer.shutdown().await.ok();
876 result
878 }
879
880 pub async fn kill_session(&self, session_id: u32) -> crate::Result<()> {
882 let inner = format!("<kill-session><session-id>{session_id}</session-id></kill-session>");
883 let reply = self.rpc_raw(&inner).await?;
884 reply_to_ok(reply)
885 }
886
887 pub fn with_timeout(&self, timeout: Duration) -> SessionWithTimeout<'_> {
890 SessionWithTimeout {
891 session: self,
892 timeout,
893 }
894 }
895
896 pub fn pending_rpc_count(&self) -> usize {
902 self.inner.pending.lock().unwrap().len() + self.inner.active_streams.load(Ordering::Acquire)
903 }
904
905 pub fn last_rpc_at(&self) -> Option<Instant> {
908 let nanos = self.inner.last_rpc_nanos.load(Ordering::Acquire);
909 if nanos == 0 {
910 None
911 } else {
912 Some(self.inner.created_at + Duration::from_nanos(nanos))
913 }
914 }
915
916 pub fn connected_since(&self) -> Instant {
919 self.connected_since
920 }
921}
922
923pub struct SessionWithTimeout<'a> {
928 session: &'a Session,
929 timeout: Duration,
930}
931
932impl SessionWithTimeout<'_> {
933 pub async fn rpc_raw(&self, inner_xml: &str) -> crate::Result<RpcReply> {
935 let future = self.session.rpc_send(inner_xml).await?;
936 future.response_with_timeout(self.timeout).await
937 }
938
939 pub async fn get_config(
941 &self,
942 source: Datastore,
943 filter: Option<&str>,
944 ) -> crate::Result<String> {
945 let filter_xml = match filter {
946 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
947 None => String::new(),
948 };
949 let inner = format!(
950 "<get-config><source>{}</source>{filter_xml}</get-config>",
951 source.as_xml()
952 );
953 let reply = self.rpc_raw(&inner).await?;
954 reply_to_data(reply)
955 }
956
957 pub async fn get_config_payload(
959 &self,
960 source: Datastore,
961 filter: Option<&str>,
962 ) -> crate::Result<DataPayload> {
963 let filter_xml = match filter {
964 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
965 None => String::new(),
966 };
967 let inner = format!(
968 "<get-config><source>{}</source>{filter_xml}</get-config>",
969 source.as_xml()
970 );
971 let reply = self.rpc_raw(&inner).await?;
972 reply.into_data()
973 }
974
975 pub async fn get(&self, filter: Option<&str>) -> crate::Result<String> {
977 let filter_xml = match filter {
978 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
979 None => String::new(),
980 };
981 let inner = format!("<get>{filter_xml}</get>");
982 let reply = self.rpc_raw(&inner).await?;
983 reply_to_data(reply)
984 }
985
986 pub async fn get_payload(&self, filter: Option<&str>) -> crate::Result<DataPayload> {
988 let filter_xml = match filter {
989 Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
990 None => String::new(),
991 };
992 let inner = format!("<get>{filter_xml}</get>");
993 let reply = self.rpc_raw(&inner).await?;
994 reply.into_data()
995 }
996
997 pub async fn edit_config(&self, target: Datastore, config: &str) -> crate::Result<()> {
999 let inner = format!(
1000 "<edit-config><target>{}</target><config>{config}</config></edit-config>",
1001 target.as_xml()
1002 );
1003 let reply = self.rpc_raw(&inner).await?;
1004 reply_to_ok(reply)
1005 }
1006
1007 pub async fn lock(&self, target: Datastore) -> crate::Result<()> {
1009 let inner = format!("<lock><target>{}</target></lock>", target.as_xml());
1010 let reply = self.rpc_raw(&inner).await?;
1011 reply_to_ok(reply)
1012 }
1013
1014 pub async fn unlock(&self, target: Datastore) -> crate::Result<()> {
1016 let inner = format!("<unlock><target>{}</target></unlock>", target.as_xml());
1017 let reply = self.rpc_raw(&inner).await?;
1018 reply_to_ok(reply)
1019 }
1020
1021 pub async fn commit(&self) -> crate::Result<()> {
1023 let reply = self.rpc_raw("<commit/>").await?;
1024 reply_to_ok(reply)
1025 }
1026}
1027
1028async fn exchange_hello<S: AsyncRead + AsyncWrite + Unpin>(
1030 stream: &mut S,
1031 config: &Config,
1032) -> crate::Result<(ServerHello, FramingMode)> {
1033 let fut = crate::hello::exchange(stream, config.codec.max_message_size);
1034 match config.hello_timeout {
1035 Some(duration) => tokio::time::timeout(duration, fut)
1036 .await
1037 .map_err(|_| crate::Error::Transport(TransportError::Timeout(duration)))?,
1038 None => fut.await,
1039 }
1040}
1041
1042enum ReaderMessageState {
1052 AwaitingHeader { buf: BytesMut },
1055 Accumulating { msg_id: u32, buf: BytesMut },
1057 Streaming {
1059 msg_id: u32,
1060 tx: tokio::sync::mpsc::Sender<crate::Result<Bytes>>,
1061 },
1062}
1063
1064async fn reader_loop(
1075 mut reader: FramedRead<ReadHalf<NetconfStream>, NetconfCodec>,
1076 inner: Arc<SessionInner>,
1077 session_id: u32,
1078) {
1079 debug!("session {}: reader loop started", session_id);
1080 let mut disconnect_reason = DisconnectReason::Eof;
1081 let mut state = ReaderMessageState::AwaitingHeader {
1082 buf: BytesMut::new(),
1083 };
1084
1085 loop {
1086 if inner.state() == SessionState::Closing {
1088 reader.decoder_mut().set_closing();
1089 }
1090 let Some(result) = reader.next().await else {
1091 break;
1092 };
1093 match result {
1094 Ok(frame) => {
1095 state = process_frame(frame, state, &inner, session_id).await;
1096 }
1097 Err(e) => {
1098 debug!("session {}: reader error: {e}", session_id);
1099 disconnect_reason = DisconnectReason::TransportError(e.to_string());
1100
1101 if let ReaderMessageState::Streaming { tx, .. } = &state {
1103 let _ = tx.try_send(Err(crate::Error::SessionClosed));
1104 }
1105
1106 let drained = inner.drain_pending();
1107 if drained > 0 {
1108 debug!(
1109 "session {}: drained {} pending RPCs after error",
1110 session_id, drained
1111 );
1112 }
1113 break;
1114 }
1115 }
1116 }
1117
1118 if let ReaderMessageState::Streaming { tx, .. } = &state {
1120 let _ = tx.try_send(Err(crate::Error::SessionClosed));
1121 }
1122
1123 {
1125 let drained = inner.drain_pending();
1126 if drained > 0 {
1127 debug!(
1128 "session {}: drained {} pending RPCs on stream close",
1129 session_id, drained
1130 );
1131 }
1132 }
1133
1134 inner.set_state(SessionState::Closed);
1135 let _ = inner.disconnect_tx.send(Some(disconnect_reason));
1136 debug!("session {}: reader loop ended", session_id);
1137}
1138
1139async fn process_frame(
1141 frame: DecodedFrame,
1142 state: ReaderMessageState,
1143 inner: &SessionInner,
1144 session_id: u32,
1145) -> ReaderMessageState {
1146 match frame {
1147 DecodedFrame::Chunk(chunk) => match state {
1148 ReaderMessageState::AwaitingHeader { mut buf } => {
1149 buf.extend_from_slice(&chunk);
1150
1151 if let Some(msg_id) = extract_message_id_from_bytes(&buf) {
1153 let is_stream = {
1155 let pending = inner.pending.lock().unwrap();
1156 matches!(pending.get(&msg_id), Some(PendingRpc::Stream(_)))
1157 };
1158
1159 if is_stream {
1160 let tx = {
1163 let mut pending = inner.pending.lock().unwrap();
1164 match pending.remove(&msg_id) {
1165 Some(PendingRpc::Stream(tx)) => tx,
1166 _ => {
1169 return ReaderMessageState::Accumulating { msg_id, buf };
1170 }
1171 }
1172 };
1173 inner.active_streams.fetch_add(1, Ordering::Release);
1174 let _ = tx.send(Ok(buf.freeze())).await;
1176 debug!(
1177 "session {}: streaming rpc message-id={}",
1178 session_id, msg_id
1179 );
1180 ReaderMessageState::Streaming { msg_id, tx }
1181 } else {
1182 ReaderMessageState::Accumulating { msg_id, buf }
1184 }
1185 } else {
1186 ReaderMessageState::AwaitingHeader { buf }
1188 }
1189 }
1190 ReaderMessageState::Accumulating { msg_id, mut buf } => {
1191 buf.extend_from_slice(&chunk);
1192 ReaderMessageState::Accumulating { msg_id, buf }
1193 }
1194 ReaderMessageState::Streaming { msg_id, tx } => {
1195 let _ = tx.send(Ok(chunk)).await;
1199 ReaderMessageState::Streaming { msg_id, tx }
1200 }
1201 },
1202
1203 DecodedFrame::EndOfMessage => match state {
1204 ReaderMessageState::AwaitingHeader { .. } => {
1205 trace!("session {}: empty or unparseable message", session_id);
1207 ReaderMessageState::AwaitingHeader {
1208 buf: BytesMut::new(),
1209 }
1210 }
1211 ReaderMessageState::Accumulating { msg_id, buf } => {
1212 let bytes = buf.freeze();
1214 trace!(
1215 "session {}: complete message for msg-id={} ({} bytes)",
1216 session_id,
1217 msg_id,
1218 bytes.len()
1219 );
1220
1221 match message::classify_message(bytes) {
1222 Ok(ServerMessage::RpcReply(reply)) => {
1223 debug!(
1224 "session {}: received rpc-reply message-id={}",
1225 session_id, reply.message_id
1226 );
1227 let tx = {
1228 let mut pending = inner.pending.lock().unwrap();
1229 pending.remove(&reply.message_id)
1230 };
1231 if let Some(PendingRpc::Normal(tx)) = tx {
1232 let nanos = inner.created_at.elapsed().as_nanos() as u64;
1233 inner.last_rpc_nanos.store(nanos, Ordering::Release);
1234 let _ = tx.send(Ok(reply));
1235 } else {
1236 warn!(
1237 "session {}: received reply for unknown message-id {}",
1238 session_id, reply.message_id
1239 );
1240 }
1241 }
1242 Err(e) => {
1243 warn!("session {}: failed to classify message: {e}", session_id);
1244 }
1245 }
1246
1247 ReaderMessageState::AwaitingHeader {
1248 buf: BytesMut::new(),
1249 }
1250 }
1251 ReaderMessageState::Streaming { msg_id, tx } => {
1252 drop(tx);
1254 inner.active_streams.fetch_sub(1, Ordering::Release);
1255 let nanos = inner.created_at.elapsed().as_nanos() as u64;
1256 inner.last_rpc_nanos.store(nanos, Ordering::Release);
1257 debug!(
1258 "session {}: streaming message complete for msg-id={}",
1259 session_id, msg_id
1260 );
1261 ReaderMessageState::AwaitingHeader {
1262 buf: BytesMut::new(),
1263 }
1264 }
1265 },
1266 }
1267}
1268
1269fn reply_to_data(reply: RpcReply) -> crate::Result<String> {
1270 match reply.body {
1271 RpcReplyBody::Data(payload) => Ok(payload.into_string()),
1272 RpcReplyBody::Ok => Ok(String::new()),
1273 RpcReplyBody::Error(errors) => Err(crate::Error::Rpc {
1274 message_id: reply.message_id,
1275 error: errors
1276 .first()
1277 .map(|e| e.error_message.clone())
1278 .unwrap_or_default(),
1279 }),
1280 }
1281}
1282
1283fn reply_to_ok(reply: RpcReply) -> crate::Result<()> {
1284 match reply.body {
1285 RpcReplyBody::Ok => Ok(()),
1286 RpcReplyBody::Data(_) => Ok(()),
1287 RpcReplyBody::Error(errors) => Err(crate::Error::Rpc {
1288 message_id: reply.message_id,
1289 error: errors
1290 .first()
1291 .map(|e| e.error_message.clone())
1292 .unwrap_or_default(),
1293 }),
1294 }
1295}