Skip to main content

netconf_rust/
session.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::io;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll};
8use std::time::{Duration, Instant};
9
10use bytes::{Buf, Bytes, BytesMut};
11use log::{debug, trace, warn};
12use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, ReadHalf, WriteHalf};
13use tokio_stream::StreamExt;
14use tokio_util::codec::{Encoder, FramedRead};
15
16use crate::codec::{DecodedFrame, FramingMode, NetconfCodec, extract_message_id_from_bytes};
17use crate::config::Config;
18use crate::error::TransportError;
19use crate::hello::ServerHello;
20use crate::message::{self, DataPayload, RpcReply, RpcReplyBody, ServerMessage};
21use crate::stream::NetconfStream;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum SessionState {
26    /// Hello exchange complete, ready for RPCs
27    Ready = 0,
28    /// A 'close-session' RPC has been sent, awaiting reply
29    Closing = 1,
30    /// Session terminated gracefully or with error
31    Closed = 2,
32}
33
34impl SessionState {
35    fn from_u8(v: u8) -> Self {
36        match v {
37            0 => Self::Ready,
38            1 => Self::Closing,
39            _ => Self::Closed,
40        }
41    }
42}
43
44impl std::fmt::Display for SessionState {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Self::Ready => write!(f, "Ready"),
48            Self::Closing => write!(f, "Closing"),
49            Self::Closed => write!(f, "Closed"),
50        }
51    }
52}
53
54/// Reason the session disconnected.
55///
56/// Delivered via [`Session::disconnected()`] when the background reader
57/// task detects that the connection is no longer alive.
58#[derive(Debug, Clone)]
59pub enum DisconnectReason {
60    /// The remote end closed the connection cleanly (TCP FIN / EOF).
61    Eof,
62    /// A transport error severed the connection.
63    ///
64    /// Contains the error's display string.
65    TransportError(String),
66    /// The [`Session`] was dropped without calling [`Session::close_session()`].
67    Dropped,
68}
69
70impl std::fmt::Display for DisconnectReason {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            Self::Eof => write!(f, "connection closed by remote"),
74            Self::TransportError(e) => write!(f, "transport error: {e}"),
75            Self::Dropped => write!(f, "session dropped"),
76        }
77    }
78}
79
80#[derive(Debug, Clone, Copy)]
81pub enum Datastore {
82    Running,
83    Candidate,
84    Startup,
85}
86
87impl Datastore {
88    fn as_xml(&self) -> &'static str {
89        match self {
90            Datastore::Running => "<running/>",
91            Datastore::Candidate => "<candidate/>",
92            Datastore::Startup => "<startup/>",
93        }
94    }
95}
96
97// =============================================================================
98// PendingRpc — supports both normal (buffered) and streaming RPCs
99// =============================================================================
100
101/// A pending RPC awaiting its response from the server.
102///
103/// Normal RPCs accumulate the full response and deliver it as an `RpcReply`
104/// via a oneshot channel. Streaming RPCs forward individual chunks via an
105/// mpsc channel, enabling the consumer to process data incrementally.
106enum PendingRpc {
107    Normal(tokio::sync::oneshot::Sender<crate::Result<RpcReply>>),
108    Stream(tokio::sync::mpsc::Sender<crate::Result<Bytes>>),
109}
110
111/// A handle to a pending RPC reply.
112///
113/// Created by [`Session::rpc_send()`], this allows sending multiple RPCs
114/// before awaiting any replies (pipelining).
115pub struct RpcFuture {
116    rx: tokio::sync::oneshot::Receiver<crate::Result<RpcReply>>,
117    msg_id: u32,
118    rpc_timeout: Option<Duration>,
119}
120
121impl RpcFuture {
122    /// The message-id of this RPC.
123    pub fn message_id(&self) -> u32 {
124        self.msg_id
125    }
126
127    /// Await the RPC reply.
128    ///
129    /// If an `rpc_timeout` was configured, this will fail with
130    /// [`TransportError::Timeout`]
131    /// if the server does not reply in time.
132    pub async fn response(self) -> crate::Result<RpcReply> {
133        let result = match self.rpc_timeout {
134            Some(duration) => tokio::time::timeout(duration, self.rx)
135                .await
136                .map_err(|_| crate::Error::Transport(TransportError::Timeout(duration)))?,
137            None => self.rx.await,
138        };
139        result.map_err(|_| crate::Error::SessionClosed)?
140    }
141
142    /// Await the RPC reply with an explicit timeout, ignoring the
143    /// session-level `rpc_timeout`.
144    ///
145    /// Fails with [`TransportError::Timeout`]
146    /// if the server does not reply within `timeout`.
147    pub async fn response_with_timeout(self, timeout: Duration) -> crate::Result<RpcReply> {
148        let result = tokio::time::timeout(timeout, self.rx)
149            .await
150            .map_err(|_| crate::Error::Transport(TransportError::Timeout(timeout)))?;
151        result.map_err(|_| crate::Error::SessionClosed)?
152    }
153}
154
155// =============================================================================
156// RpcStream — streaming RPC response with AsyncRead
157// =============================================================================
158
159/// A streaming RPC response that yields raw XML bytes as they arrive.
160///
161/// Implements [`AsyncRead`] so it can be plugged directly into compression
162/// encoders, file writers, or any byte-oriented consumer without buffering
163/// the entire response.
164///
165/// Created by [`Session::rpc_stream()`].
166///
167/// # Example
168///
169/// ```no_run
170/// # use netconf_rust::Session;
171/// # use tokio::io::AsyncReadExt;
172/// # async fn example(session: &Session) -> netconf_rust::Result<()> {
173/// let mut stream = session.rpc_stream("<get-config><source><running/></source></get-config>").await?;
174/// let mut buf = [0u8; 8192];
175/// loop {
176///     let n = stream.read(&mut buf).await?;
177///     if n == 0 { break; }
178///     // process buf[..n] — e.g. feed to a compressor
179/// }
180/// # Ok(())
181/// # }
182/// ```
183pub struct RpcStream {
184    rx: tokio::sync::mpsc::Receiver<crate::Result<Bytes>>,
185    /// Partially consumed chunk from the last `poll_read`.
186    current: Bytes,
187    msg_id: u32,
188    done: bool,
189}
190
191impl RpcStream {
192    /// The message-id of this streaming RPC.
193    pub fn message_id(&self) -> u32 {
194        self.msg_id
195    }
196
197    /// Whether the stream has finished (all chunks received).
198    pub fn is_done(&self) -> bool {
199        self.done
200    }
201}
202
203impl AsyncRead for RpcStream {
204    fn poll_read(
205        mut self: Pin<&mut Self>,
206        cx: &mut Context<'_>,
207        buf: &mut ReadBuf<'_>,
208    ) -> Poll<io::Result<()>> {
209        // 1. Serve from the partially-consumed current chunk.
210        if !self.current.is_empty() {
211            let n = std::cmp::min(buf.remaining(), self.current.len());
212            buf.put_slice(&self.current[..n]);
213            self.current.advance(n);
214            return Poll::Ready(Ok(()));
215        }
216
217        // 2. If done, return EOF.
218        if self.done {
219            return Poll::Ready(Ok(()));
220        }
221
222        // 3. Poll the channel for the next chunk.
223        match self.rx.poll_recv(cx) {
224            Poll::Ready(Some(Ok(chunk))) => {
225                let n = std::cmp::min(buf.remaining(), chunk.len());
226                buf.put_slice(&chunk[..n]);
227                if n < chunk.len() {
228                    self.current = chunk.slice(n..);
229                }
230                Poll::Ready(Ok(()))
231            }
232            Poll::Ready(Some(Err(e))) => {
233                self.done = true;
234                Poll::Ready(Err(io::Error::other(e.to_string())))
235            }
236            Poll::Ready(None) => {
237                // Channel closed — stream complete.
238                self.done = true;
239                Poll::Ready(Ok(()))
240            }
241            Poll::Pending => Poll::Pending,
242        }
243    }
244}
245
246// =============================================================================
247// SessionInner
248// =============================================================================
249
250struct SessionInner {
251    /// Pending RPC replies: message-id → pending RPC (normal or stream).
252    pending: Mutex<HashMap<u32, PendingRpc>>,
253    /// Session lifecycle state, stored as AtomicU8 for lock-free access
254    /// across the session and reader task.
255    ///
256    /// State transitions:
257    ///   Ready → Closing → Closed   (graceful: user calls close_session)
258    ///   Ready → Closed             (abrupt: reader hits EOF or error)
259    ///
260    /// - Ready:   normal operation, RPCs can be sent
261    /// - Closing: close-session RPC in flight, new RPCs rejected
262    /// - Closed:  session terminated, all operations fail
263    state: AtomicU8,
264    /// Sender for the disconnect notification. The reader_loop sends the
265    /// reason just before exiting. Receivers (from `Session::disconnected()`)
266    /// wake up immediately.
267    disconnect_tx: tokio::sync::watch::Sender<Option<DisconnectReason>>,
268    /// Anchor instant for computing `last_rpc_at` from `last_rpc_nanos`.
269    created_at: Instant,
270    /// Nanoseconds elapsed since `created_at` when the last RPC reply was
271    /// successfully routed. 0 means no reply has been received yet.
272    last_rpc_nanos: AtomicU64,
273    /// Number of streaming RPCs currently in flight (removed from `pending`
274    /// but not yet completed with `EndOfMessage`).
275    active_streams: AtomicUsize,
276}
277
278impl SessionInner {
279    fn state(&self) -> SessionState {
280        SessionState::from_u8(self.state.load(Ordering::Acquire))
281    }
282
283    fn set_state(&self, state: SessionState) {
284        self.state.store(state as u8, Ordering::Release);
285    }
286
287    fn drain_pending(&self) -> usize {
288        let mut pending = self.pending.lock().unwrap();
289        let count = pending.len();
290        for (_, rpc) in pending.drain() {
291            match rpc {
292                PendingRpc::Normal(tx) => {
293                    let _ = tx.send(Err(crate::Error::SessionClosed));
294                }
295                PendingRpc::Stream(tx) => {
296                    let _ = tx.try_send(Err(crate::Error::SessionClosed));
297                }
298            }
299        }
300        count
301    }
302}
303
304/// Write half + codec, behind a `tokio::sync::Mutex` so Session methods
305/// can take `&self`. The Mutex is held only during encode + write + flush.
306struct WriterState {
307    writer: WriteHalf<NetconfStream>,
308    codec: NetconfCodec,
309}
310
311/// A NETCONF session with pipelining and streaming support.
312///
313/// Architecture:
314/// - **Writer**: the session holds the write half of the stream and a codec
315///   for encoding outgoing RPCs.
316/// - **Reader task**: a background tokio task reads framed chunks from
317///   the read half, classifies them (`<rpc-reply>` vs `<notification>`),
318///   and routes them to the correct handler.
319/// - **Pipelining**: [`rpc_send()`](Session::rpc_send) writes an RPC and
320///   returns an [`RpcFuture`] without waiting for the reply. Multiple RPCs
321///   can be in flight simultaneously.
322/// - **Streaming**: [`rpc_stream()`](Session::rpc_stream) writes an RPC and
323///   returns an [`RpcStream`] that yields raw bytes as they arrive. This
324///   avoids buffering the entire response for large payloads.
325pub struct Session {
326    /// Write state behind an async Mutex. Held only for the duration of
327    /// encoding and flushing a single message.
328    writer: tokio::sync::Mutex<WriterState>,
329
330    /// Shared state between this session and the background reader task.
331    /// Contains the pending RPC map, notification channel, and session state.
332    inner: Arc<SessionInner>,
333
334    /// The server's hello response, containing its capabilities and session ID.
335    server_hello: ServerHello,
336
337    /// Negotiated framing mode (EOM for 1.0-only servers, chunked for 1.1).
338    framing: FramingMode,
339
340    /// Timeout applied to each RPC response wait.
341    rpc_timeout: Option<Duration>,
342
343    /// Channel capacity for streaming RPCs (see [`Config::stream_buffer_capacity`]).
344    stream_buffer_capacity: usize,
345
346    /// Receiver for disconnect notifications. Cloned for each call to
347    /// `disconnected()`, allowing multiple independent subscribers.
348    disconnect_rx: tokio::sync::watch::Receiver<Option<DisconnectReason>>,
349
350    /// Instant when the session was established (hello exchange complete).
351    connected_since: Instant,
352
353    /// Handle to the background reader task. Aborted on drop to ensure
354    /// the task doesn't outlive the session. Declared before `_keep_alive`
355    /// so it is dropped first — the reader task must be aborted before
356    /// the SSH connection is torn down.
357    _reader_handle: tokio::task::JoinHandle<()>,
358
359    /// Holds the SSH handle alive. Dropping this tears down the SSH connection,
360    /// which would invalidate the stream. Never accessed, just kept alive.
361    _keep_alive: Option<Box<dyn std::any::Any + Send + Sync>>,
362}
363
364impl Drop for Session {
365    fn drop(&mut self) {
366        // 1. Drain pending RPCs → waiters get deterministic SessionClosed
367        let drained = self.inner.drain_pending();
368        if drained > 0 {
369            debug!(
370                "session {}: drop: drained {drained} pending RPCs",
371                self.server_hello.session_id
372            );
373        }
374        // 2. Mark session closed
375        self.inner.set_state(SessionState::Closed);
376        // 3. Notify disconnect subscribers (only if reader_loop hasn't already)
377        self.inner.disconnect_tx.send_if_modified(|current| {
378            if current.is_none() {
379                *current = Some(DisconnectReason::Dropped);
380                true
381            } else {
382                false
383            }
384        });
385        // 4. Abort reader task (its cleanup is now a no-op)
386        self._reader_handle.abort();
387    }
388}
389
390impl Session {
391    /// Connect to a NETCONF server over SSH with password authentication.
392    pub async fn connect(
393        host: &str,
394        port: u16,
395        username: &str,
396        password: &str,
397    ) -> crate::Result<Self> {
398        Self::connect_with_config(host, port, username, password, Config::default()).await
399    }
400
401    /// Connect with custom configuration.
402    pub async fn connect_with_config(
403        host: &str,
404        port: u16,
405        username: &str,
406        password: &str,
407        config: Config,
408    ) -> crate::Result<Self> {
409        let (mut stream, keep_alive) =
410            crate::transport::connect(host, port, username, password, &config).await?;
411        let (server_hello, framing) = exchange_hello(&mut stream, &config).await?;
412        Self::build(stream, Some(keep_alive), server_hello, framing, config)
413    }
414
415    /// Create a session from an existing stream (useful for testing).
416    pub async fn from_stream<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
417        stream: S,
418    ) -> crate::Result<Self> {
419        Self::from_stream_with_config(stream, Config::default()).await
420    }
421
422    /// Create a session from an existing stream with custom configuration.
423    pub async fn from_stream_with_config<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
424        mut stream: S,
425        config: Config,
426    ) -> crate::Result<Self> {
427        let (server_hello, framing) = exchange_hello(&mut stream, &config).await?;
428        let boxed: NetconfStream = Box::new(stream);
429        Self::build(boxed, None, server_hello, framing, config)
430    }
431
432    fn build(
433        stream: NetconfStream,
434        keep_alive: Option<Box<dyn std::any::Any + Send + Sync>>,
435        server_hello: ServerHello,
436        framing: FramingMode,
437        config: Config,
438    ) -> crate::Result<Self> {
439        debug!(
440            "session {}: building (framing={:?}, capabilities={})",
441            server_hello.session_id,
442            framing,
443            server_hello.capabilities.len()
444        );
445        let (read_half, write_half) = tokio::io::split(stream);
446
447        let read_codec = NetconfCodec::new(framing, config.codec);
448        let write_codec = NetconfCodec::new(framing, config.codec);
449        let reader = FramedRead::new(read_half, read_codec);
450
451        let (disconnect_tx, disconnect_rx) = tokio::sync::watch::channel(None);
452
453        let inner = Arc::new(SessionInner {
454            pending: Mutex::new(HashMap::new()),
455            state: AtomicU8::new(SessionState::Ready as u8),
456            disconnect_tx,
457            created_at: Instant::now(),
458            last_rpc_nanos: AtomicU64::new(0),
459            active_streams: AtomicUsize::new(0),
460        });
461
462        let reader_inner = Arc::clone(&inner);
463        let session_id = server_hello.session_id;
464        let reader_handle = tokio::spawn(async move {
465            reader_loop(reader, reader_inner, session_id).await;
466        });
467
468        Ok(Self {
469            writer: tokio::sync::Mutex::new(WriterState {
470                writer: write_half,
471                codec: write_codec,
472            }),
473            inner,
474            server_hello,
475            framing,
476            rpc_timeout: config.rpc_timeout,
477            stream_buffer_capacity: config.stream_buffer_capacity,
478            disconnect_rx,
479            connected_since: Instant::now(),
480            _reader_handle: reader_handle,
481            _keep_alive: keep_alive,
482        })
483    }
484
485    pub fn session_id(&self) -> u32 {
486        self.server_hello.session_id
487    }
488
489    pub fn server_capabilities(&self) -> &[String] {
490        &self.server_hello.capabilities
491    }
492
493    pub fn framing_mode(&self) -> FramingMode {
494        self.framing
495    }
496
497    pub fn state(&self) -> SessionState {
498        self.inner.state()
499    }
500
501    /// Returns a future that completes when the session disconnects.
502    ///
503    /// Can be called multiple times — each call clones an internal
504    /// `watch::Receiver`, so multiple tasks can independently await
505    /// the same disconnect event. If the session is already disconnected
506    /// when called, returns immediately.
507    ///
508    /// # Example
509    ///
510    /// ```no_run
511    /// # use netconf_rust::Session;
512    /// # async fn example(session: &Session) {
513    /// let reason = session.disconnected().await;
514    /// println!("session died: {reason}");
515    /// # }
516    /// ```
517    pub fn disconnected(&self) -> impl Future<Output = DisconnectReason> + Send + 'static {
518        let mut rx = self.disconnect_rx.clone();
519        async move {
520            // Check if already disconnected (late subscriber).
521            if let Some(reason) = rx.borrow_and_update().clone() {
522                return reason;
523            }
524            // Wait for the reader_loop to send the reason. If the sender
525            // is dropped (Session dropped, reader aborted), treat as Dropped.
526            loop {
527                if rx.changed().await.is_err() {
528                    return DisconnectReason::Dropped;
529                }
530                if let Some(reason) = rx.borrow_and_update().clone() {
531                    return reason;
532                }
533            }
534        }
535    }
536
537    fn check_state(&self) -> crate::Result<()> {
538        let state = self.inner.state();
539        if state != SessionState::Ready {
540            return Err(crate::Error::InvalidState(state.to_string()));
541        }
542        Ok(())
543    }
544
545    /// Encode a message with the negotiated framing (EOM or chunked) and
546    /// write it to the stream.
547    async fn send_encoded(&self, xml: &str) -> crate::Result<()> {
548        let mut buf = BytesMut::new();
549        let mut state = self.writer.lock().await;
550        state.codec.encode(Bytes::from(xml.to_string()), &mut buf)?;
551        trace!(
552            "session {}: writing {} bytes to stream",
553            self.server_hello.session_id,
554            buf.len()
555        );
556        state.writer.write_all(&buf).await?;
557        state.writer.flush().await?;
558        Ok(())
559    }
560
561    /// Send a raw RPC and return a future for the reply (pipelining).
562    ///
563    /// This writes the RPC to the server immediately but does not wait
564    /// for the reply. Call [`RpcFuture::response()`] to await the reply.
565    /// Multiple RPCs can be pipelined by calling this repeatedly before
566    /// awaiting any of them.
567    pub async fn rpc_send(&self, inner_xml: &str) -> crate::Result<RpcFuture> {
568        self.check_state()?;
569        let (msg_id, xml) = message::build_rpc(inner_xml);
570        debug!(
571            "session {}: sending rpc message-id={} ({} bytes)",
572            self.server_hello.session_id,
573            msg_id,
574            xml.len()
575        );
576        trace!(
577            "session {}: rpc content: {}",
578            self.server_hello.session_id, inner_xml
579        );
580        let (tx, rx) = tokio::sync::oneshot::channel();
581
582        self.inner
583            .pending
584            .lock()
585            .unwrap()
586            .insert(msg_id, PendingRpc::Normal(tx));
587
588        if let Err(e) = self.send_encoded(&xml).await {
589            debug!(
590                "session {}: send failed for message-id={}: {}",
591                self.server_hello.session_id, msg_id, e
592            );
593            self.inner.pending.lock().unwrap().remove(&msg_id);
594            return Err(e);
595        }
596        Ok(RpcFuture {
597            rx,
598            msg_id,
599            rpc_timeout: self.rpc_timeout,
600        })
601    }
602
603    /// Send a raw RPC and wait for the reply.
604    pub async fn rpc_raw(&self, inner_xml: &str) -> crate::Result<RpcReply> {
605        let future = self.rpc_send(inner_xml).await?;
606        future.response().await
607    }
608
609    /// Send a raw RPC and return a streaming response.
610    ///
611    /// Unlike [`rpc_send()`](Session::rpc_send) which buffers the entire
612    /// response, this returns an [`RpcStream`] that yields raw XML bytes
613    /// as individual chunks arrive from the server. The stream implements
614    /// [`AsyncRead`], so it can be piped directly into compression encoders,
615    /// file writers, or any byte-oriented consumer.
616    ///
617    /// Normal (buffered) and streaming RPCs can be freely interleaved on
618    /// the same session — the reader task routes each message independently
619    /// based on its `message-id`.
620    ///
621    /// # Example
622    ///
623    /// ```no_run
624    /// # use netconf_rust::Session;
625    /// # use tokio::io::AsyncReadExt;
626    /// # async fn example(session: &Session) -> netconf_rust::Result<()> {
627    /// // Pipeline a normal RPC alongside a streaming one
628    /// let edit_future = session.rpc_send("<edit-config>...</edit-config>").await?;
629    /// let mut config_stream = session.rpc_stream("<get-config><source><running/></source></get-config>").await?;
630    ///
631    /// // Stream the large response
632    /// let mut buf = [0u8; 8192];
633    /// loop {
634    ///     let n = config_stream.read(&mut buf).await?;
635    ///     if n == 0 { break; }
636    ///     // process buf[..n]
637    /// }
638    ///
639    /// // Collect the normal RPC reply
640    /// let edit_reply = edit_future.response().await?;
641    /// # Ok(())
642    /// # }
643    /// ```
644    pub async fn rpc_stream(&self, inner_xml: &str) -> crate::Result<RpcStream> {
645        self.check_state()?;
646        let (msg_id, xml) = message::build_rpc(inner_xml);
647        debug!(
648            "session {}: sending streaming rpc message-id={} ({} bytes)",
649            self.server_hello.session_id,
650            msg_id,
651            xml.len()
652        );
653
654        let (tx, rx) = tokio::sync::mpsc::channel(self.stream_buffer_capacity);
655
656        self.inner
657            .pending
658            .lock()
659            .unwrap()
660            .insert(msg_id, PendingRpc::Stream(tx));
661
662        if let Err(e) = self.send_encoded(&xml).await {
663            debug!(
664                "session {}: send failed for streaming message-id={}: {}",
665                self.server_hello.session_id, msg_id, e
666            );
667            self.inner.pending.lock().unwrap().remove(&msg_id);
668            return Err(e);
669        }
670
671        Ok(RpcStream {
672            rx,
673            current: Bytes::new(),
674            msg_id,
675            done: false,
676        })
677    }
678
679    /// Internal rpc send that skips state check. Only used for sending close-session.
680    async fn rpc_send_unchecked(&self, inner_xml: &str) -> crate::Result<RpcFuture> {
681        let (msg_id, xml) = message::build_rpc(inner_xml);
682        let (tx, rx) = tokio::sync::oneshot::channel();
683
684        self.inner
685            .pending
686            .lock()
687            .unwrap()
688            .insert(msg_id, PendingRpc::Normal(tx));
689
690        if let Err(e) = self.send_encoded(&xml).await {
691            self.inner.pending.lock().unwrap().remove(&msg_id);
692            return Err(e);
693        }
694
695        Ok(RpcFuture {
696            rx,
697            msg_id,
698            rpc_timeout: self.rpc_timeout,
699        })
700    }
701
702    /// Retrieve configuration from a datastore.
703    pub async fn get_config(
704        &self,
705        source: Datastore,
706        filter: Option<&str>,
707    ) -> crate::Result<String> {
708        let filter_xml = match filter {
709            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
710            None => String::new(),
711        };
712        let inner = format!(
713            "<get-config><source>{}</source>{filter_xml}</get-config>",
714            source.as_xml()
715        );
716        let reply = self.rpc_raw(&inner).await?;
717        reply_to_data(reply)
718    }
719
720    /// Retrieve configuration as a zero-copy `DataPayload`.
721    ///
722    /// Same as `get_config()` but returns a `DataPayload` instead of `String`,
723    /// avoiding a copy of the response body. Use `payload.as_str()` for a
724    /// zero-copy `&str` view, or `payload.reader()` for streaming XML events.
725    pub async fn get_config_payload(
726        &self,
727        source: Datastore,
728        filter: Option<&str>,
729    ) -> crate::Result<DataPayload> {
730        let filter_xml = match filter {
731            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
732            None => String::new(),
733        };
734        let inner = format!(
735            "<get-config><source>{}</source>{filter_xml}</get-config>",
736            source.as_xml()
737        );
738        let reply = self.rpc_raw(&inner).await?;
739        reply.into_data()
740    }
741
742    /// Retrieve configuration from a datastore as a streaming response.
743    ///
744    /// Same as `get_config()` but returns an [`RpcStream`] instead of
745    /// buffering the entire response.
746    pub async fn get_config_stream(
747        &self,
748        source: Datastore,
749        filter: Option<&str>,
750    ) -> crate::Result<RpcStream> {
751        let filter_xml = match filter {
752            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
753            None => String::new(),
754        };
755        let inner = format!(
756            "<get-config><source>{}</source>{filter_xml}</get-config>",
757            source.as_xml()
758        );
759        self.rpc_stream(&inner).await
760    }
761
762    /// Retrieve running configuration and state data.
763    pub async fn get(&self, filter: Option<&str>) -> crate::Result<String> {
764        let filter_xml = match filter {
765            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
766            None => String::new(),
767        };
768        let inner = format!("<get>{filter_xml}</get>");
769        let reply = self.rpc_raw(&inner).await?;
770        reply_to_data(reply)
771    }
772
773    /// Retrieve running configuration and state data as a zero-copy `DataPayload`.
774    ///
775    /// Same as `get()` but returns a `DataPayload` instead of `String`,
776    /// avoiding a copy of the response body.
777    pub async fn get_payload(&self, filter: Option<&str>) -> crate::Result<DataPayload> {
778        let filter_xml = match filter {
779            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
780            None => String::new(),
781        };
782        let inner = format!("<get>{filter_xml}</get>");
783        let reply = self.rpc_raw(&inner).await?;
784        reply.into_data()
785    }
786
787    /// Retrieve running configuration and state data as a streaming response.
788    ///
789    /// Same as `get()` but returns an [`RpcStream`] instead of buffering
790    /// the entire response.
791    pub async fn get_stream(&self, filter: Option<&str>) -> crate::Result<RpcStream> {
792        let filter_xml = match filter {
793            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
794            None => String::new(),
795        };
796        let inner = format!("<get>{filter_xml}</get>");
797        self.rpc_stream(&inner).await
798    }
799
800    /// Edit the configuration of a target datastore.
801    pub async fn edit_config(&self, target: Datastore, config: &str) -> crate::Result<()> {
802        let inner = format!(
803            "<edit-config><target>{}</target><config>{config}</config></edit-config>",
804            target.as_xml()
805        );
806        let reply = self.rpc_raw(&inner).await?;
807        reply_to_ok(reply)
808    }
809
810    /// Lock a datastore
811    pub async fn lock(&self, target: Datastore) -> crate::Result<()> {
812        let inner = format!("<lock><target>{}</target></lock>", target.as_xml());
813        let reply = self.rpc_raw(&inner).await?;
814        reply_to_ok(reply)
815    }
816
817    /// Unlock a datastore.
818    pub async fn unlock(&self, target: Datastore) -> crate::Result<()> {
819        let inner = format!("<unlock><target>{}</target></unlock>", target.as_xml());
820        let reply = self.rpc_raw(&inner).await?;
821        reply_to_ok(reply)
822    }
823
824    /// Commit the candidate configuration to running.
825    pub async fn commit(&self) -> crate::Result<()> {
826        let reply = self.rpc_raw("<commit/>").await?;
827        reply_to_ok(reply)
828    }
829
830    /// Gracefully close the NETCONF session.
831    pub async fn close_session(&self) -> crate::Result<()> {
832        // Atomically transition Ready → Closing. If another caller already
833        // moved us out of Ready, we fail immediately.
834        let prev = self.inner.state.compare_exchange(
835            SessionState::Ready as u8,
836            SessionState::Closing as u8,
837            Ordering::AcqRel,
838            Ordering::Acquire,
839        );
840        if let Err(current) = prev {
841            let state = SessionState::from_u8(current);
842            return Err(crate::Error::InvalidState(state.to_string()));
843        }
844        debug!("session {}: closing", self.server_hello.session_id);
845        let result = self.rpc_send_unchecked("<close-session/>").await;
846        match result {
847            Ok(future) => {
848                let reply = future.response().await;
849                self.inner.set_state(SessionState::Closed);
850                debug!(
851                    "session {}: closed gracefully",
852                    self.server_hello.session_id
853                );
854                reply_to_ok(reply?)
855            }
856            Err(e) => {
857                self.inner.set_state(SessionState::Closed);
858                debug!(
859                    "session {}: close failed: {}",
860                    self.server_hello.session_id, e
861                );
862                Err(e)
863            }
864        }
865    }
866
867    /// Gracefully close the NETCONF session and shut down the transport.
868    ///
869    /// Sends a `<close-session/>` RPC, shuts down the write half of the
870    /// stream, and drops the session (aborting the reader task and releasing
871    /// the SSH handle). Prefer this over [`close_session()`](Self::close_session)
872    /// when you are done with the session entirely.
873    pub async fn close(self) -> crate::Result<()> {
874        let result = self.close_session().await;
875        self.writer.lock().await.writer.shutdown().await.ok();
876        // `self` is dropped here — reader task aborted, SSH handle released
877        result
878    }
879
880    /// Force-close another NETCONF session.
881    pub async fn kill_session(&self, session_id: u32) -> crate::Result<()> {
882        let inner = format!("<kill-session><session-id>{session_id}</session-id></kill-session>");
883        let reply = self.rpc_raw(&inner).await?;
884        reply_to_ok(reply)
885    }
886
887    /// Return a wrapper that applies `timeout` to every RPC sent through it,
888    /// overriding the session-level `rpc_timeout`.
889    pub fn with_timeout(&self, timeout: Duration) -> SessionWithTimeout<'_> {
890        SessionWithTimeout {
891            session: self,
892            timeout,
893        }
894    }
895
896    /// Number of RPCs that have been sent but not yet fully replied to.
897    ///
898    /// This includes both RPCs awaiting their first reply byte (in the
899    /// pending map) and streaming RPCs whose response is in flight but
900    /// not yet complete.
901    pub fn pending_rpc_count(&self) -> usize {
902        self.inner.pending.lock().unwrap().len() + self.inner.active_streams.load(Ordering::Acquire)
903    }
904
905    /// The [`Instant`] when the most recent RPC reply was received, or
906    /// `None` if no reply has been received yet.
907    pub fn last_rpc_at(&self) -> Option<Instant> {
908        let nanos = self.inner.last_rpc_nanos.load(Ordering::Acquire);
909        if nanos == 0 {
910            None
911        } else {
912            Some(self.inner.created_at + Duration::from_nanos(nanos))
913        }
914    }
915
916    /// The [`Instant`] when the session was established (hello exchange
917    /// complete and reader task started).
918    pub fn connected_since(&self) -> Instant {
919        self.connected_since
920    }
921}
922
923/// A wrapper around [`Session`] that applies a per-call timeout to every RPC.
924///
925/// Created by [`Session::with_timeout()`]. Each method sends the RPC via
926/// the underlying session and awaits the reply with the configured timeout.
927pub struct SessionWithTimeout<'a> {
928    session: &'a Session,
929    timeout: Duration,
930}
931
932impl SessionWithTimeout<'_> {
933    /// Send a raw RPC and wait for the reply with the configured timeout.
934    pub async fn rpc_raw(&self, inner_xml: &str) -> crate::Result<RpcReply> {
935        let future = self.session.rpc_send(inner_xml).await?;
936        future.response_with_timeout(self.timeout).await
937    }
938
939    /// Retrieve configuration from a datastore.
940    pub async fn get_config(
941        &self,
942        source: Datastore,
943        filter: Option<&str>,
944    ) -> crate::Result<String> {
945        let filter_xml = match filter {
946            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
947            None => String::new(),
948        };
949        let inner = format!(
950            "<get-config><source>{}</source>{filter_xml}</get-config>",
951            source.as_xml()
952        );
953        let reply = self.rpc_raw(&inner).await?;
954        reply_to_data(reply)
955    }
956
957    /// Retrieve configuration as a zero-copy `DataPayload`.
958    pub async fn get_config_payload(
959        &self,
960        source: Datastore,
961        filter: Option<&str>,
962    ) -> crate::Result<DataPayload> {
963        let filter_xml = match filter {
964            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
965            None => String::new(),
966        };
967        let inner = format!(
968            "<get-config><source>{}</source>{filter_xml}</get-config>",
969            source.as_xml()
970        );
971        let reply = self.rpc_raw(&inner).await?;
972        reply.into_data()
973    }
974
975    /// Retrieve running configuration and state data.
976    pub async fn get(&self, filter: Option<&str>) -> crate::Result<String> {
977        let filter_xml = match filter {
978            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
979            None => String::new(),
980        };
981        let inner = format!("<get>{filter_xml}</get>");
982        let reply = self.rpc_raw(&inner).await?;
983        reply_to_data(reply)
984    }
985
986    /// Retrieve running configuration and state data as a zero-copy `DataPayload`.
987    pub async fn get_payload(&self, filter: Option<&str>) -> crate::Result<DataPayload> {
988        let filter_xml = match filter {
989            Some(f) => format!(r#"<filter type="subtree">{f}</filter>"#),
990            None => String::new(),
991        };
992        let inner = format!("<get>{filter_xml}</get>");
993        let reply = self.rpc_raw(&inner).await?;
994        reply.into_data()
995    }
996
997    /// Edit the configuration of a target datastore.
998    pub async fn edit_config(&self, target: Datastore, config: &str) -> crate::Result<()> {
999        let inner = format!(
1000            "<edit-config><target>{}</target><config>{config}</config></edit-config>",
1001            target.as_xml()
1002        );
1003        let reply = self.rpc_raw(&inner).await?;
1004        reply_to_ok(reply)
1005    }
1006
1007    /// Lock a datastore.
1008    pub async fn lock(&self, target: Datastore) -> crate::Result<()> {
1009        let inner = format!("<lock><target>{}</target></lock>", target.as_xml());
1010        let reply = self.rpc_raw(&inner).await?;
1011        reply_to_ok(reply)
1012    }
1013
1014    /// Unlock a datastore.
1015    pub async fn unlock(&self, target: Datastore) -> crate::Result<()> {
1016        let inner = format!("<unlock><target>{}</target></unlock>", target.as_xml());
1017        let reply = self.rpc_raw(&inner).await?;
1018        reply_to_ok(reply)
1019    }
1020
1021    /// Commit the candidate configuration to running.
1022    pub async fn commit(&self) -> crate::Result<()> {
1023        let reply = self.rpc_raw("<commit/>").await?;
1024        reply_to_ok(reply)
1025    }
1026}
1027
1028/// Perform the NETCONF hello exchange, optionally with a timeout.
1029async fn exchange_hello<S: AsyncRead + AsyncWrite + Unpin>(
1030    stream: &mut S,
1031    config: &Config,
1032) -> crate::Result<(ServerHello, FramingMode)> {
1033    let fut = crate::hello::exchange(stream, config.codec.max_message_size);
1034    match config.hello_timeout {
1035        Some(duration) => tokio::time::timeout(duration, fut)
1036            .await
1037            .map_err(|_| crate::Error::Transport(TransportError::Timeout(duration)))?,
1038        None => fut.await,
1039    }
1040}
1041
1042// =============================================================================
1043// Reader task — state machine for chunk-level routing
1044// =============================================================================
1045
1046/// Per-message state machine for the reader task.
1047///
1048/// Tracks whether the current message is being accumulated (normal RPC)
1049/// or streamed (streaming RPC). The state transitions on each
1050/// [`DecodedFrame`] from the codec.
1051enum ReaderMessageState {
1052    /// Waiting for the first chunk of a new message.
1053    /// Accumulates bytes until the `message-id` can be extracted.
1054    AwaitingHeader { buf: BytesMut },
1055    /// Accumulating chunks for a normal (non-streaming) RPC.
1056    Accumulating { msg_id: u32, buf: BytesMut },
1057    /// Forwarding chunks for a streaming RPC.
1058    Streaming {
1059        msg_id: u32,
1060        tx: tokio::sync::mpsc::Sender<crate::Result<Bytes>>,
1061    },
1062}
1063
1064/// This loop is the only thing reading from the SSH stream. It runs in a
1065/// background tokio task. The session's main API (the writer side) never
1066/// reads — it only writes RPCs and waits on oneshot/mpsc channels.
1067/// This separation is what makes pipelining and streaming work.
1068///
1069/// The codec now yields [`DecodedFrame`] items (individual chunks and
1070/// end-of-message markers) instead of complete messages. The reader task
1071/// uses a [`ReaderMessageState`] state machine to decide per-message
1072/// whether to accumulate chunks (normal RPCs) or forward them (streaming
1073/// RPCs). The `message-id` from the first chunk determines the routing.
1074async fn reader_loop(
1075    mut reader: FramedRead<ReadHalf<NetconfStream>, NetconfCodec>,
1076    inner: Arc<SessionInner>,
1077    session_id: u32,
1078) {
1079    debug!("session {}: reader loop started", session_id);
1080    let mut disconnect_reason = DisconnectReason::Eof;
1081    let mut state = ReaderMessageState::AwaitingHeader {
1082        buf: BytesMut::new(),
1083    };
1084
1085    loop {
1086        // Propagate the closing flag so decode_eof can discard holdback bytes.
1087        if inner.state() == SessionState::Closing {
1088            reader.decoder_mut().set_closing();
1089        }
1090        let Some(result) = reader.next().await else {
1091            break;
1092        };
1093        match result {
1094            Ok(frame) => {
1095                state = process_frame(frame, state, &inner, session_id).await;
1096            }
1097            Err(e) => {
1098                debug!("session {}: reader error: {e}", session_id);
1099                disconnect_reason = DisconnectReason::TransportError(e.to_string());
1100
1101                // Notify any in-flight streaming RPC about the error.
1102                if let ReaderMessageState::Streaming { tx, .. } = &state {
1103                    let _ = tx.try_send(Err(crate::Error::SessionClosed));
1104                }
1105
1106                let drained = inner.drain_pending();
1107                if drained > 0 {
1108                    debug!(
1109                        "session {}: drained {} pending RPCs after error",
1110                        session_id, drained
1111                    );
1112                }
1113                break;
1114            }
1115        }
1116    }
1117
1118    // Notify any in-flight streaming RPC about session close.
1119    if let ReaderMessageState::Streaming { tx, .. } = &state {
1120        let _ = tx.try_send(Err(crate::Error::SessionClosed));
1121    }
1122
1123    // Drain any remaining pending RPCs.
1124    {
1125        let drained = inner.drain_pending();
1126        if drained > 0 {
1127            debug!(
1128                "session {}: drained {} pending RPCs on stream close",
1129                session_id, drained
1130            );
1131        }
1132    }
1133
1134    inner.set_state(SessionState::Closed);
1135    let _ = inner.disconnect_tx.send(Some(disconnect_reason));
1136    debug!("session {}: reader loop ended", session_id);
1137}
1138
1139/// Process a single decoded frame, advancing the per-message state machine.
1140async fn process_frame(
1141    frame: DecodedFrame,
1142    state: ReaderMessageState,
1143    inner: &SessionInner,
1144    session_id: u32,
1145) -> ReaderMessageState {
1146    match frame {
1147        DecodedFrame::Chunk(chunk) => match state {
1148            ReaderMessageState::AwaitingHeader { mut buf } => {
1149                buf.extend_from_slice(&chunk);
1150
1151                // Try to extract the message-id from what we have so far.
1152                if let Some(msg_id) = extract_message_id_from_bytes(&buf) {
1153                    // Look up the pending RPC type to decide routing.
1154                    let is_stream = {
1155                        let pending = inner.pending.lock().unwrap();
1156                        matches!(pending.get(&msg_id), Some(PendingRpc::Stream(_)))
1157                    };
1158
1159                    if is_stream {
1160                        // Remove the stream sender from pending and transition
1161                        // to Streaming state.
1162                        let tx = {
1163                            let mut pending = inner.pending.lock().unwrap();
1164                            match pending.remove(&msg_id) {
1165                                Some(PendingRpc::Stream(tx)) => tx,
1166                                // Race: might have been drained between the
1167                                // check above and here. Fall back to accumulate.
1168                                _ => {
1169                                    return ReaderMessageState::Accumulating { msg_id, buf };
1170                                }
1171                            }
1172                        };
1173                        inner.active_streams.fetch_add(1, Ordering::Release);
1174                        // Forward the buffered header as the first chunk.
1175                        let _ = tx.send(Ok(buf.freeze())).await;
1176                        debug!(
1177                            "session {}: streaming rpc message-id={}",
1178                            session_id, msg_id
1179                        );
1180                        ReaderMessageState::Streaming { msg_id, tx }
1181                    } else {
1182                        // Normal RPC or unknown — accumulate.
1183                        ReaderMessageState::Accumulating { msg_id, buf }
1184                    }
1185                } else {
1186                    // Need more data to find message-id.
1187                    ReaderMessageState::AwaitingHeader { buf }
1188                }
1189            }
1190            ReaderMessageState::Accumulating { msg_id, mut buf } => {
1191                buf.extend_from_slice(&chunk);
1192                ReaderMessageState::Accumulating { msg_id, buf }
1193            }
1194            ReaderMessageState::Streaming { msg_id, tx } => {
1195                // Forward chunk to the streaming consumer. If the consumer
1196                // dropped the receiver, just discard — we still need to
1197                // consume until EndOfMessage so we can start the next message.
1198                let _ = tx.send(Ok(chunk)).await;
1199                ReaderMessageState::Streaming { msg_id, tx }
1200            }
1201        },
1202
1203        DecodedFrame::EndOfMessage => match state {
1204            ReaderMessageState::AwaitingHeader { .. } => {
1205                // Empty message or we never found a message-id — reset.
1206                trace!("session {}: empty or unparseable message", session_id);
1207                ReaderMessageState::AwaitingHeader {
1208                    buf: BytesMut::new(),
1209                }
1210            }
1211            ReaderMessageState::Accumulating { msg_id, buf } => {
1212                // Complete normal message — classify and route.
1213                let bytes = buf.freeze();
1214                trace!(
1215                    "session {}: complete message for msg-id={} ({} bytes)",
1216                    session_id,
1217                    msg_id,
1218                    bytes.len()
1219                );
1220
1221                match message::classify_message(bytes) {
1222                    Ok(ServerMessage::RpcReply(reply)) => {
1223                        debug!(
1224                            "session {}: received rpc-reply message-id={}",
1225                            session_id, reply.message_id
1226                        );
1227                        let tx = {
1228                            let mut pending = inner.pending.lock().unwrap();
1229                            pending.remove(&reply.message_id)
1230                        };
1231                        if let Some(PendingRpc::Normal(tx)) = tx {
1232                            let nanos = inner.created_at.elapsed().as_nanos() as u64;
1233                            inner.last_rpc_nanos.store(nanos, Ordering::Release);
1234                            let _ = tx.send(Ok(reply));
1235                        } else {
1236                            warn!(
1237                                "session {}: received reply for unknown message-id {}",
1238                                session_id, reply.message_id
1239                            );
1240                        }
1241                    }
1242                    Err(e) => {
1243                        warn!("session {}: failed to classify message: {e}", session_id);
1244                    }
1245                }
1246
1247                ReaderMessageState::AwaitingHeader {
1248                    buf: BytesMut::new(),
1249                }
1250            }
1251            ReaderMessageState::Streaming { msg_id, tx } => {
1252                // End of streaming message — drop the sender to signal EOF.
1253                drop(tx);
1254                inner.active_streams.fetch_sub(1, Ordering::Release);
1255                let nanos = inner.created_at.elapsed().as_nanos() as u64;
1256                inner.last_rpc_nanos.store(nanos, Ordering::Release);
1257                debug!(
1258                    "session {}: streaming message complete for msg-id={}",
1259                    session_id, msg_id
1260                );
1261                ReaderMessageState::AwaitingHeader {
1262                    buf: BytesMut::new(),
1263                }
1264            }
1265        },
1266    }
1267}
1268
1269fn reply_to_data(reply: RpcReply) -> crate::Result<String> {
1270    match reply.body {
1271        RpcReplyBody::Data(payload) => Ok(payload.into_string()),
1272        RpcReplyBody::Ok => Ok(String::new()),
1273        RpcReplyBody::Error(errors) => Err(crate::Error::Rpc {
1274            message_id: reply.message_id,
1275            error: errors
1276                .first()
1277                .map(|e| e.error_message.clone())
1278                .unwrap_or_default(),
1279        }),
1280    }
1281}
1282
1283fn reply_to_ok(reply: RpcReply) -> crate::Result<()> {
1284    match reply.body {
1285        RpcReplyBody::Ok => Ok(()),
1286        RpcReplyBody::Data(_) => Ok(()),
1287        RpcReplyBody::Error(errors) => Err(crate::Error::Rpc {
1288            message_id: reply.message_id,
1289            error: errors
1290                .first()
1291                .map(|e| e.error_message.clone())
1292                .unwrap_or_default(),
1293        }),
1294    }
1295}