Skip to main content

sqlx_sqlserver/
connection.rs

1use futures_core::future::BoxFuture;
2use futures_core::stream::BoxStream;
3use futures_util::{future, stream, StreamExt};
4use native_tls::Certificate;
5use sqlx_core::connection::Connection;
6use sqlx_core::decode::Decode;
7use sqlx_core::error::Error;
8use sqlx_core::executor::{Execute, Executor};
9use sqlx_core::transaction::Transaction;
10use sqlx_core::value::Value;
11use sqlx_core::Either;
12use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
13use tokio::net::TcpStream;
14use tokio_native_tls::TlsConnector;
15
16use crate::error::server_error;
17use crate::protocol::login::{build_login7_packet, Login7Error};
18use crate::protocol::packet::{PacketHeader, PacketStatus, PacketType, PACKET_HEADER_LEN};
19use crate::protocol::pre_login::{build_pre_login_packet, parse_server_encrypt, PreLoginError};
20use crate::protocol::query::{build_sql_batch_packet, parse_query_response, QueryOutput};
21use crate::protocol::rpc::{
22    build_execute_sql_packet, build_prepare_packet, build_unprepare_packet,
23};
24use crate::protocol::token::{parse_login_response, EnvChange, LoginResponse, TokenParseError};
25use crate::tls::TlsPreloginStream;
26use crate::{
27    ssrp, Encrypt, Mssql, MssqlArguments, MssqlConnectOptions, MssqlQueryResult, MssqlRow,
28    MssqlStatement, MssqlTypeInfo,
29};
30
31/// SQL Server connection.
32#[derive(Debug)]
33pub struct MssqlConnection {
34    stream: Option<MssqlWireStream>,
35    transaction_depth: usize,
36    transaction_descriptor: u64,
37    pending_rollback_sql: Option<&'static str>,
38}
39
40impl MssqlConnection {
41    /// Establishes a SQL Server TCP connection and completes PRELOGIN and LOGIN7.
42    pub async fn establish(options: &MssqlConnectOptions) -> Result<Self, Error> {
43        let mut stream = MssqlWireStream::connect(options).await?;
44
45        let pre_login = build_pre_login_packet(options).map_err(pre_login_error)?;
46        stream.write_all(&pre_login).await?;
47
48        let pre_login_response = stream.read_message().await?;
49        if pre_login_response.packet_type != PacketType::TABULAR_RESULT {
50            return Err(Error::Protocol(format!(
51                "expected SQL Server PRELOGIN response as tabular result, got packet type 0x{:02x}",
52                pre_login_response.packet_type.code()
53            )));
54        }
55
56        let server_encrypt =
57            parse_server_encrypt(&pre_login_response.payload).map_err(pre_login_error)?;
58        let encrypted = negotiate_encryption(options.encrypt(), server_encrypt)?;
59
60        if encrypted {
61            stream.enable_tls(options).await?;
62        }
63
64        let login = build_login7_packet(options).map_err(login_error)?;
65        stream.write_all(&login).await?;
66
67        let login_response = stream.read_message().await?;
68        if login_response.packet_type != PacketType::TABULAR_RESULT {
69            return Err(Error::Protocol(format!(
70                "expected SQL Server LOGIN7 response as tabular result, got packet type 0x{:02x}",
71                login_response.packet_type.code()
72            )));
73        }
74
75        match parse_login_response(&login_response.payload).map_err(token_error)? {
76            LoginResponse::Success { env_changes, .. } => {
77                let mut conn = Self {
78                    stream: Some(stream),
79                    transaction_depth: 0,
80                    transaction_descriptor: 0,
81                    pending_rollback_sql: None,
82                };
83                conn.apply_env_changes(&env_changes);
84                Ok(conn)
85            }
86            LoginResponse::ServerError(error) => Err(server_error(error)),
87        }
88    }
89
90    fn apply_env_changes(&mut self, env_changes: &[EnvChange]) {
91        for change in env_changes {
92            match change {
93                EnvChange::PacketSize(size) => {
94                    if let Some(stream) = self.stream.as_mut() {
95                        stream.packet_size = (*size).clamp(512, 32767) as usize;
96                    }
97                }
98                EnvChange::BeginTransaction(descriptor) => {
99                    self.transaction_descriptor = *descriptor;
100                }
101                EnvChange::CommitTransaction(_) | EnvChange::RollbackTransaction(_) => {
102                    self.transaction_descriptor = 0;
103                }
104                _ => {}
105            }
106        }
107    }
108
109    /// Returns the current transaction depth tracked by the connection.
110    pub const fn transaction_depth(&self) -> usize {
111        self.transaction_depth
112    }
113
114    pub(crate) fn increment_transaction_depth(&mut self) {
115        self.transaction_depth += 1;
116    }
117
118    pub(crate) fn decrement_transaction_depth(&mut self) {
119        self.transaction_depth = self.transaction_depth.saturating_sub(1);
120    }
121
122    pub(crate) fn clear_transaction_depth(&mut self) {
123        self.transaction_depth = 0;
124    }
125
126    pub(crate) async fn run_sql_batch(&mut self, sql: &str) -> Result<QueryOutput, Error> {
127        self.flush_pending_rollback().await?;
128        self.run_sql_batch_direct(sql).await
129    }
130
131    async fn run_sql_batch_direct(&mut self, sql: &str) -> Result<QueryOutput, Error> {
132        let transaction_descriptor = self.transaction_descriptor;
133        let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
134        let packet = build_sql_batch_packet(sql, stream.packet_size, transaction_descriptor)
135            .map_err(frame_error)?;
136        stream.write_all(&packet).await?;
137
138        self.read_query_response().await
139    }
140
141    pub(crate) fn queue_rollback(&mut self) {
142        let sql = match self.transaction_depth {
143            0 => return,
144            1 => {
145                self.transaction_depth = 0;
146                "ROLLBACK TRANSACTION"
147            }
148            _ => {
149                self.transaction_depth -= 1;
150                "ROLLBACK TRANSACTION sqlx_savepoint"
151            }
152        };
153
154        self.pending_rollback_sql = Some(sql);
155    }
156
157    async fn flush_pending_rollback(&mut self) -> Result<(), Error> {
158        let Some(sql) = self.pending_rollback_sql.take() else {
159            return Ok(());
160        };
161
162        self.run_sql_batch_direct(sql).await?;
163        Ok(())
164    }
165
166    pub(crate) async fn run_execute_sql(
167        &mut self,
168        sql: &str,
169        arguments: Option<&MssqlArguments>,
170    ) -> Result<QueryOutput, Error> {
171        self.flush_pending_rollback().await?;
172
173        match arguments {
174            Some(arguments) if !arguments.is_empty() => {
175                let transaction_descriptor = self.transaction_descriptor;
176                let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
177                let packet = build_execute_sql_packet(
178                    sql,
179                    arguments,
180                    stream.packet_size,
181                    transaction_descriptor,
182                )
183                .map_err(|error| {
184                    Error::Protocol(format!("failed to encode SQL Server RPC: {error}"))
185                })?;
186                stream.write_all(&packet).await?;
187                self.read_query_response().await
188            }
189            _ => self.run_sql_batch_direct(sql).await,
190        }
191    }
192
193    pub(crate) async fn run_prepare(
194        &mut self,
195        sql: &str,
196        parameters: &[MssqlTypeInfo],
197    ) -> Result<QueryOutput, Error> {
198        self.flush_pending_rollback().await?;
199
200        let transaction_descriptor = self.transaction_descriptor;
201        let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
202        let packet =
203            build_prepare_packet(sql, parameters, stream.packet_size, transaction_descriptor)
204                .map_err(|error| {
205                    Error::Protocol(format!("failed to encode SQL Server prepare RPC: {error}"))
206                })?;
207        stream.write_all(&packet).await?;
208
209        let output = self.read_query_response().await?;
210
211        if let Some(statement_id) = first_i32_return_value(&output)? {
212            let transaction_descriptor = self.transaction_descriptor;
213            let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
214            let packet =
215                build_unprepare_packet(statement_id, stream.packet_size, transaction_descriptor)
216                    .map_err(|error| {
217                        Error::Protocol(format!(
218                            "failed to encode SQL Server unprepare RPC: {error}"
219                        ))
220                    })?;
221            stream.write_all(&packet).await?;
222            let _ = self.read_query_response().await?;
223        }
224
225        Ok(output)
226    }
227
228    async fn read_query_response(&mut self) -> Result<QueryOutput, Error> {
229        let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
230        let response = stream.read_message().await?;
231        if response.packet_type != PacketType::TABULAR_RESULT {
232            return Err(Error::Protocol(format!(
233                "expected SQL Server query response as tabular result, got packet type 0x{:02x}",
234                response.packet_type.code()
235            )));
236        }
237
238        let output = parse_query_response(&response.payload)?;
239        self.apply_env_changes(&output.env_changes);
240        Ok(output)
241    }
242}
243
244impl Connection for MssqlConnection {
245    type Database = Mssql;
246    type Options = MssqlConnectOptions;
247
248    async fn close(mut self) -> Result<(), Error> {
249        self.flush_pending_rollback().await?;
250
251        if let Some(mut stream) = self.stream.take() {
252            stream.shutdown().await?;
253        }
254
255        Ok(())
256    }
257
258    async fn close_hard(mut self) -> Result<(), Error> {
259        if let Some(mut stream) = self.stream.take() {
260            stream.shutdown().await?;
261        }
262
263        Ok(())
264    }
265
266    async fn ping(&mut self) -> Result<(), Error> {
267        self.flush_pending_rollback().await?;
268
269        if self.stream.is_some() {
270            Ok(())
271        } else {
272            Err(wire_not_implemented())
273        }
274    }
275
276    fn begin(
277        &mut self,
278    ) -> impl std::future::Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_
279    {
280        Transaction::begin(self, None)
281    }
282
283    fn shrink_buffers(&mut self) {}
284
285    async fn flush(&mut self) -> Result<(), Error> {
286        Ok(())
287    }
288
289    fn should_flush(&self) -> bool {
290        false
291    }
292}
293
294impl<'c> Executor<'c> for &'c mut MssqlConnection {
295    type Database = Mssql;
296
297    fn fetch_many<'e, 'q, E>(
298        self,
299        mut query: E,
300    ) -> BoxStream<'e, Result<Either<MssqlQueryResult, MssqlRow>, Error>>
301    where
302        'c: 'e,
303        E: Execute<'q, Self::Database>,
304        'q: 'e,
305        E: 'q,
306    {
307        let arguments = query.take_arguments().map_err(Error::Encode);
308        let sql = query.sql();
309
310        stream::once(async move {
311            let arguments = arguments?;
312            self.run_execute_sql(sql.as_str(), arguments.as_ref()).await
313        })
314        .map(|result| match result {
315            Ok(output) => stream_query_output(output),
316            Err(error) => stream::once(future::ready(Err(error))).boxed(),
317        })
318        .flatten()
319        .boxed()
320    }
321
322    fn fetch_optional<'e, 'q, E>(
323        self,
324        mut query: E,
325    ) -> BoxFuture<'e, Result<Option<MssqlRow>, Error>>
326    where
327        'c: 'e,
328        E: Execute<'q, Self::Database>,
329        'q: 'e,
330        E: 'q,
331    {
332        let arguments = query.take_arguments().map_err(Error::Encode);
333        let sql = query.sql();
334
335        Box::pin(async move {
336            let arguments = arguments?;
337            Ok(self
338                .run_execute_sql(sql.as_str(), arguments.as_ref())
339                .await?
340                .rows
341                .into_iter()
342                .next())
343        })
344    }
345
346    fn prepare_with<'e>(
347        self,
348        sql: sqlx_core::sql_str::SqlStr,
349        parameters: &'e [crate::MssqlTypeInfo],
350    ) -> BoxFuture<'e, Result<MssqlStatement, Error>>
351    where
352        'c: 'e,
353    {
354        Box::pin(async move {
355            let output = self.run_prepare(sql.as_str(), parameters).await?;
356            let parameters = if parameters.is_empty() {
357                None
358            } else {
359                Some(Either::Left(parameters.to_vec()))
360            };
361
362            Ok(MssqlStatement::with_parameters(
363                sql,
364                output.columns,
365                parameters,
366            ))
367        })
368    }
369}
370
371fn first_i32_return_value(output: &QueryOutput) -> Result<Option<i32>, Error> {
372    output
373        .return_values
374        .first()
375        .map(|value| {
376            <i32 as Decode<Mssql>>::decode(value.as_ref()).map_err(|error| Error::ColumnDecode {
377                index: "return value".to_owned(),
378                source: error,
379            })
380        })
381        .transpose()
382}
383
384pub(crate) fn wire_not_implemented() -> Error {
385    Error::Protocol("SQL Server connection stream is not available".to_owned())
386}
387
388struct MssqlWireStream {
389    stream: MssqlStream,
390    packet_size: usize,
391}
392
393impl std::fmt::Debug for MssqlWireStream {
394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395        f.debug_struct("MssqlWireStream")
396            .field("encrypted", &matches!(self.stream, MssqlStream::Tls(_)))
397            .field("packet_size", &self.packet_size)
398            .finish()
399    }
400}
401
402enum MssqlStream {
403    Raw(TcpStream),
404    Tls(tokio_native_tls::TlsStream<TlsPreloginStream<TcpStream>>),
405    Taken,
406}
407
408impl MssqlWireStream {
409    async fn connect(options: &MssqlConnectOptions) -> Result<Self, Error> {
410        let port = match (options.port(), options.instance()) {
411            (Some(port), _) => port,
412            (None, Some(instance)) => ssrp::resolve_instance_port(options.host(), instance).await?,
413            (None, None) => 1433,
414        };
415
416        let stream = TcpStream::connect((options.host(), port)).await?;
417        let packet_size = usize::try_from(options.requested_packet_size()).map_err(|_| {
418            Error::Protocol(format!(
419                "SQL Server packet size {} does not fit usize",
420                options.requested_packet_size()
421            ))
422        })?;
423
424        Ok(Self {
425            stream: MssqlStream::Raw(stream),
426            packet_size,
427        })
428    }
429
430    async fn write_all(&mut self, bytes: &[u8]) -> Result<(), Error> {
431        match &mut self.stream {
432            MssqlStream::Raw(stream) => {
433                write_tds_packets(stream, bytes).await?;
434            }
435            MssqlStream::Tls(stream) => {
436                write_tds_packets(stream, bytes).await?;
437            }
438            MssqlStream::Taken => return Err(taken_stream_error()),
439        }
440        Ok(())
441    }
442
443    async fn shutdown(&mut self) -> Result<(), Error> {
444        match &mut self.stream {
445            MssqlStream::Raw(stream) => stream.shutdown().await?,
446            MssqlStream::Tls(stream) => stream.shutdown().await?,
447            MssqlStream::Taken => return Err(taken_stream_error()),
448        }
449        Ok(())
450    }
451
452    async fn enable_tls(&mut self, options: &MssqlConnectOptions) -> Result<(), Error> {
453        let stream = match std::mem::replace(&mut self.stream, MssqlStream::Taken) {
454            MssqlStream::Raw(stream) => stream,
455            other => {
456                self.stream = other;
457                return Ok(());
458            }
459        };
460
461        let mut stream = TlsPreloginStream::new(stream);
462        stream.start_handshake();
463
464        let domain = options
465            .hostname_in_certificate()
466            .unwrap_or_else(|| options.host());
467        let connector = build_tls_connector(options)?;
468        let mut stream = connector
469            .connect(domain, stream)
470            .await
471            .map_err(|error| {
472                Error::Tls(
473                    std::io::Error::other(format!(
474                        "SQL Server TLS handshake failed for host `{}` during the TDS PRELOGIN encryption upgrade \
475                         (encrypt={:?}, trust_server_certificate={}, hostname_in_certificate={}, ssl_root_cert={}): {}",
476                        domain,
477                        options.encrypt(),
478                        options.trust_server_certificate(),
479                        options.hostname_in_certificate().unwrap_or("<not set>"),
480                        options.ssl_root_cert().is_some(),
481                        error
482                    ))
483                    .into(),
484                )
485            })?;
486        stream.get_mut().get_mut().get_mut().finish_handshake();
487
488        self.stream = MssqlStream::Tls(stream);
489        Ok(())
490    }
491
492    async fn read_message(&mut self) -> Result<WireMessage, Error> {
493        let mut packet_type = None;
494        let mut expected_packet_id = None;
495        let mut payload = Vec::new();
496
497        loop {
498            let mut header_bytes = [0u8; PACKET_HEADER_LEN];
499            self.read_exact(&mut header_bytes).await?;
500            let header = PacketHeader::decode(&header_bytes).map_err(packet_error)?;
501
502            if let Some(packet_type) = packet_type {
503                if header.packet_type != packet_type {
504                    return Err(Error::Protocol(format!(
505                        "mismatched SQL Server packet type: expected 0x{:02x}, got 0x{:02x}",
506                        packet_type.code(),
507                        header.packet_type.code()
508                    )));
509                }
510            } else {
511                packet_type = Some(header.packet_type);
512            }
513
514            if let Some(packet_id) = expected_packet_id {
515                if header.packet_id != packet_id {
516                    return Err(Error::Protocol(format!(
517                        "non-contiguous SQL Server packet id: expected {packet_id}, got {}",
518                        header.packet_id
519                    )));
520                }
521            }
522
523            let packet_len = usize::from(header.length);
524            if packet_len > self.packet_size {
525                return Err(Error::Protocol(format!(
526                    "SQL Server packet length {packet_len} exceeds negotiated packet size {}",
527                    self.packet_size
528                )));
529            }
530
531            let payload_len = packet_len.checked_sub(PACKET_HEADER_LEN).ok_or_else(|| {
532                Error::Protocol("SQL Server packet header length underflow".to_owned())
533            })?;
534            let old_len = payload.len();
535            payload.resize(old_len + payload_len, 0);
536            self.read_exact(&mut payload[old_len..]).await?;
537
538            expected_packet_id = Some(header.packet_id.wrapping_add(1));
539
540            if header.status == PacketStatus::END_OF_MESSAGE {
541                return Ok(WireMessage {
542                    packet_type: packet_type.expect("packet_type is set after first header"),
543                    payload,
544                });
545            }
546        }
547    }
548
549    async fn read_exact(&mut self, bytes: &mut [u8]) -> Result<(), Error> {
550        match &mut self.stream {
551            MssqlStream::Raw(stream) => {
552                stream.read_exact(bytes).await?;
553            }
554            MssqlStream::Tls(stream) => {
555                stream.read_exact(bytes).await?;
556            }
557            MssqlStream::Taken => return Err(taken_stream_error()),
558        }
559
560        Ok(())
561    }
562}
563
564async fn write_tds_packets<S>(stream: &mut S, bytes: &[u8]) -> Result<(), Error>
565where
566    S: AsyncWrite + Unpin,
567{
568    let mut offset = 0usize;
569
570    while offset < bytes.len() {
571        let packet = tds_packet_slice(bytes, offset)?;
572        stream.write_all(packet).await?;
573        stream.flush().await?;
574        offset += packet.len();
575    }
576
577    Ok(())
578}
579
580fn tds_packet_slice(bytes: &[u8], offset: usize) -> Result<&[u8], Error> {
581    let header_end = offset
582        .checked_add(PACKET_HEADER_LEN)
583        .ok_or_else(|| Error::Protocol("SQL Server outbound packet offset overflow".to_owned()))?;
584    let header_bytes = bytes.get(offset..header_end).ok_or_else(|| {
585        Error::Protocol("SQL Server outbound packet buffer ended inside a header".to_owned())
586    })?;
587    let header = PacketHeader::decode(header_bytes).map_err(packet_error)?;
588    let packet_len = usize::from(header.length);
589    let packet_end = offset
590        .checked_add(packet_len)
591        .ok_or_else(|| Error::Protocol("SQL Server outbound packet length overflow".to_owned()))?;
592
593    bytes.get(offset..packet_end).ok_or_else(|| {
594        Error::Protocol("SQL Server outbound packet buffer ended inside a packet".to_owned())
595    })
596}
597
598#[derive(Debug)]
599struct WireMessage {
600    packet_type: PacketType,
601    payload: Vec<u8>,
602}
603
604fn negotiate_encryption(requested: Encrypt, server: Encrypt) -> std::result::Result<bool, Error> {
605    match (requested, server) {
606        (Encrypt::NotSupported, Encrypt::NotSupported | Encrypt::Off) => Ok(false),
607        (Encrypt::NotSupported, Encrypt::On | Encrypt::Required) => Err(Error::Protocol(
608            "SQL Server requires encryption, but the client URL requested encrypt=not_supported"
609                .to_owned(),
610        )),
611        (Encrypt::Required, Encrypt::Off | Encrypt::NotSupported) => Err(Error::Tls(
612            "SQL Server TLS encryption is required but not supported by the server".into(),
613        )),
614        (Encrypt::On | Encrypt::Required, Encrypt::On | Encrypt::Required) => Ok(true),
615        (Encrypt::Off, _) | (_, Encrypt::Off) => Err(Error::Protocol(
616            "SQL Server login-only TLS fallback is not implemented yet; use encrypt=mandatory or encrypt=strict for encrypted connections, or encrypt=not_supported for plaintext development servers"
617                .to_owned(),
618        )),
619        (Encrypt::On, Encrypt::NotSupported) => Ok(false),
620    }
621}
622
623fn build_tls_connector(options: &MssqlConnectOptions) -> Result<TlsConnector, Error> {
624    let mut builder = native_tls::TlsConnector::builder();
625    builder.danger_accept_invalid_certs(options.trust_server_certificate());
626    builder.danger_accept_invalid_hostnames(options.hostname_in_certificate().is_none());
627
628    if let Some(path) = options.ssl_root_cert() {
629        let cert = std::fs::read(path).map_err(Error::Io)?;
630        let cert = Certificate::from_pem(&cert)
631            .or_else(|_| Certificate::from_der(&cert))
632            .map_err(|error| Error::Tls(error.into()))?;
633        builder.add_root_certificate(cert);
634    }
635
636    builder
637        .build()
638        .map(TlsConnector::from)
639        .map_err(|error| Error::Tls(error.into()))
640}
641
642fn taken_stream_error() -> Error {
643    Error::Protocol("SQL Server stream was used while TLS upgrade was in progress".to_owned())
644}
645
646fn packet_error(error: crate::protocol::packet::PacketHeaderError) -> Error {
647    Error::Protocol(error.to_string())
648}
649
650fn pre_login_error(error: PreLoginError) -> Error {
651    Error::Protocol(error.to_string())
652}
653
654fn login_error(error: Login7Error) -> Error {
655    Error::Protocol(error.to_string())
656}
657
658fn token_error(error: TokenParseError) -> Error {
659    Error::Protocol(error.to_string())
660}
661
662fn frame_error(error: crate::protocol::packet::PacketFrameError) -> Error {
663    Error::Protocol(error.to_string())
664}
665
666fn stream_query_output(
667    output: QueryOutput,
668) -> BoxStream<'static, Result<Either<MssqlQueryResult, MssqlRow>, Error>> {
669    stream::iter(
670        output
671            .rows
672            .into_iter()
673            .map(|row| Ok(Either::Right(row)))
674            .chain(std::iter::once(Ok(Either::Left(output.result)))),
675    )
676    .boxed()
677}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682
683    #[test]
684    fn negotiates_full_tls_for_required_or_mandatory_encryption() {
685        assert!(negotiate_encryption(Encrypt::On, Encrypt::On).unwrap());
686        assert!(negotiate_encryption(Encrypt::Required, Encrypt::Required).unwrap());
687    }
688
689    #[test]
690    fn allows_plaintext_only_when_explicitly_requested_and_supported() {
691        assert!(!negotiate_encryption(Encrypt::NotSupported, Encrypt::Off).unwrap());
692        assert!(negotiate_encryption(Encrypt::NotSupported, Encrypt::Required).is_err());
693    }
694
695    #[test]
696    fn rejects_login_only_tls_fallback_until_downgrade_is_available() {
697        assert!(negotiate_encryption(Encrypt::Off, Encrypt::On).is_err());
698        assert!(negotiate_encryption(Encrypt::On, Encrypt::Off).is_err());
699    }
700
701    #[test]
702    fn slices_encoded_outbound_packets_by_header_length() {
703        let bytes = crate::protocol::packet::encode_message(PacketType::RPC, &[0; 11], 12).unwrap();
704
705        let first = tds_packet_slice(&bytes, 0).unwrap();
706        assert_eq!(12, first.len());
707
708        let second = tds_packet_slice(&bytes, first.len()).unwrap();
709        assert_eq!(12, second.len());
710
711        let third = tds_packet_slice(&bytes, first.len() + second.len()).unwrap();
712        assert_eq!(11, third.len());
713    }
714
715    #[test]
716    fn rejects_truncated_outbound_packet() {
717        let bytes = crate::protocol::packet::encode_message(PacketType::RPC, &[0; 11], 12).unwrap();
718        let err = tds_packet_slice(&bytes[..bytes.len() - 1], 24).unwrap_err();
719
720        assert!(err.to_string().contains("ended inside a packet"));
721    }
722}