1use futures_core::future::BoxFuture;
2use futures_core::stream::BoxStream;
3use futures_util::{future, stream, StreamExt};
4use native_tls::Certificate;
5use sqlx_core::connection::Connection;
6use sqlx_core::decode::Decode;
7use sqlx_core::error::Error;
8use sqlx_core::executor::{Execute, Executor};
9use sqlx_core::transaction::Transaction;
10use sqlx_core::value::Value;
11use sqlx_core::Either;
12use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
13use tokio::net::TcpStream;
14use tokio_native_tls::TlsConnector;
15
16use crate::error::server_error;
17use crate::protocol::login::{build_login7_packet, Login7Error};
18use crate::protocol::packet::{PacketHeader, PacketStatus, PacketType, PACKET_HEADER_LEN};
19use crate::protocol::pre_login::{build_pre_login_packet, parse_server_encrypt, PreLoginError};
20use crate::protocol::query::{build_sql_batch_packet, parse_query_response, QueryOutput};
21use crate::protocol::rpc::{
22 build_execute_sql_packet, build_prepare_packet, build_unprepare_packet,
23};
24use crate::protocol::token::{parse_login_response, EnvChange, LoginResponse, TokenParseError};
25use crate::tls::TlsPreloginStream;
26use crate::{
27 ssrp, Encrypt, Mssql, MssqlArguments, MssqlConnectOptions, MssqlQueryResult, MssqlRow,
28 MssqlStatement, MssqlTypeInfo,
29};
30
31#[derive(Debug)]
33pub struct MssqlConnection {
34 stream: Option<MssqlWireStream>,
35 transaction_depth: usize,
36 transaction_descriptor: u64,
37 pending_rollback_sql: Option<&'static str>,
38}
39
40impl MssqlConnection {
41 pub async fn establish(options: &MssqlConnectOptions) -> Result<Self, Error> {
43 let mut stream = MssqlWireStream::connect(options).await?;
44
45 let pre_login = build_pre_login_packet(options).map_err(pre_login_error)?;
46 stream.write_all(&pre_login).await?;
47
48 let pre_login_response = stream.read_message().await?;
49 if pre_login_response.packet_type != PacketType::TABULAR_RESULT {
50 return Err(Error::Protocol(format!(
51 "expected SQL Server PRELOGIN response as tabular result, got packet type 0x{:02x}",
52 pre_login_response.packet_type.code()
53 )));
54 }
55
56 let server_encrypt =
57 parse_server_encrypt(&pre_login_response.payload).map_err(pre_login_error)?;
58 let encrypted = negotiate_encryption(options.encrypt(), server_encrypt)?;
59
60 if encrypted {
61 stream.enable_tls(options).await?;
62 }
63
64 let login = build_login7_packet(options).map_err(login_error)?;
65 stream.write_all(&login).await?;
66
67 let login_response = stream.read_message().await?;
68 if login_response.packet_type != PacketType::TABULAR_RESULT {
69 return Err(Error::Protocol(format!(
70 "expected SQL Server LOGIN7 response as tabular result, got packet type 0x{:02x}",
71 login_response.packet_type.code()
72 )));
73 }
74
75 match parse_login_response(&login_response.payload).map_err(token_error)? {
76 LoginResponse::Success { env_changes, .. } => {
77 let mut conn = Self {
78 stream: Some(stream),
79 transaction_depth: 0,
80 transaction_descriptor: 0,
81 pending_rollback_sql: None,
82 };
83 conn.apply_env_changes(&env_changes);
84 Ok(conn)
85 }
86 LoginResponse::ServerError(error) => Err(server_error(error)),
87 }
88 }
89
90 fn apply_env_changes(&mut self, env_changes: &[EnvChange]) {
91 for change in env_changes {
92 match change {
93 EnvChange::PacketSize(size) => {
94 if let Some(stream) = self.stream.as_mut() {
95 stream.packet_size = (*size).clamp(512, 32767) as usize;
96 }
97 }
98 EnvChange::BeginTransaction(descriptor) => {
99 self.transaction_descriptor = *descriptor;
100 }
101 EnvChange::CommitTransaction(_) | EnvChange::RollbackTransaction(_) => {
102 self.transaction_descriptor = 0;
103 }
104 _ => {}
105 }
106 }
107 }
108
109 pub const fn transaction_depth(&self) -> usize {
111 self.transaction_depth
112 }
113
114 pub(crate) fn increment_transaction_depth(&mut self) {
115 self.transaction_depth += 1;
116 }
117
118 pub(crate) fn decrement_transaction_depth(&mut self) {
119 self.transaction_depth = self.transaction_depth.saturating_sub(1);
120 }
121
122 pub(crate) fn clear_transaction_depth(&mut self) {
123 self.transaction_depth = 0;
124 }
125
126 pub(crate) async fn run_sql_batch(&mut self, sql: &str) -> Result<QueryOutput, Error> {
127 self.flush_pending_rollback().await?;
128 self.run_sql_batch_direct(sql).await
129 }
130
131 async fn run_sql_batch_direct(&mut self, sql: &str) -> Result<QueryOutput, Error> {
132 let transaction_descriptor = self.transaction_descriptor;
133 let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
134 let packet = build_sql_batch_packet(sql, stream.packet_size, transaction_descriptor)
135 .map_err(frame_error)?;
136 stream.write_all(&packet).await?;
137
138 self.read_query_response().await
139 }
140
141 pub(crate) fn queue_rollback(&mut self) {
142 let sql = match self.transaction_depth {
143 0 => return,
144 1 => {
145 self.transaction_depth = 0;
146 "ROLLBACK TRANSACTION"
147 }
148 _ => {
149 self.transaction_depth -= 1;
150 "ROLLBACK TRANSACTION sqlx_savepoint"
151 }
152 };
153
154 self.pending_rollback_sql = Some(sql);
155 }
156
157 async fn flush_pending_rollback(&mut self) -> Result<(), Error> {
158 let Some(sql) = self.pending_rollback_sql.take() else {
159 return Ok(());
160 };
161
162 self.run_sql_batch_direct(sql).await?;
163 Ok(())
164 }
165
166 pub(crate) async fn run_execute_sql(
167 &mut self,
168 sql: &str,
169 arguments: Option<&MssqlArguments>,
170 ) -> Result<QueryOutput, Error> {
171 self.flush_pending_rollback().await?;
172
173 match arguments {
174 Some(arguments) if !arguments.is_empty() => {
175 let transaction_descriptor = self.transaction_descriptor;
176 let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
177 let packet = build_execute_sql_packet(
178 sql,
179 arguments,
180 stream.packet_size,
181 transaction_descriptor,
182 )
183 .map_err(|error| {
184 Error::Protocol(format!("failed to encode SQL Server RPC: {error}"))
185 })?;
186 stream.write_all(&packet).await?;
187 self.read_query_response().await
188 }
189 _ => self.run_sql_batch_direct(sql).await,
190 }
191 }
192
193 pub(crate) async fn run_prepare(
194 &mut self,
195 sql: &str,
196 parameters: &[MssqlTypeInfo],
197 ) -> Result<QueryOutput, Error> {
198 self.flush_pending_rollback().await?;
199
200 let transaction_descriptor = self.transaction_descriptor;
201 let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
202 let packet =
203 build_prepare_packet(sql, parameters, stream.packet_size, transaction_descriptor)
204 .map_err(|error| {
205 Error::Protocol(format!("failed to encode SQL Server prepare RPC: {error}"))
206 })?;
207 stream.write_all(&packet).await?;
208
209 let output = self.read_query_response().await?;
210
211 if let Some(statement_id) = first_i32_return_value(&output)? {
212 let transaction_descriptor = self.transaction_descriptor;
213 let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
214 let packet =
215 build_unprepare_packet(statement_id, stream.packet_size, transaction_descriptor)
216 .map_err(|error| {
217 Error::Protocol(format!(
218 "failed to encode SQL Server unprepare RPC: {error}"
219 ))
220 })?;
221 stream.write_all(&packet).await?;
222 let _ = self.read_query_response().await?;
223 }
224
225 Ok(output)
226 }
227
228 async fn read_query_response(&mut self) -> Result<QueryOutput, Error> {
229 let stream = self.stream.as_mut().ok_or_else(wire_not_implemented)?;
230 let response = stream.read_message().await?;
231 if response.packet_type != PacketType::TABULAR_RESULT {
232 return Err(Error::Protocol(format!(
233 "expected SQL Server query response as tabular result, got packet type 0x{:02x}",
234 response.packet_type.code()
235 )));
236 }
237
238 let output = parse_query_response(&response.payload)?;
239 self.apply_env_changes(&output.env_changes);
240 Ok(output)
241 }
242}
243
244impl Connection for MssqlConnection {
245 type Database = Mssql;
246 type Options = MssqlConnectOptions;
247
248 async fn close(mut self) -> Result<(), Error> {
249 self.flush_pending_rollback().await?;
250
251 if let Some(mut stream) = self.stream.take() {
252 stream.shutdown().await?;
253 }
254
255 Ok(())
256 }
257
258 async fn close_hard(mut self) -> Result<(), Error> {
259 if let Some(mut stream) = self.stream.take() {
260 stream.shutdown().await?;
261 }
262
263 Ok(())
264 }
265
266 async fn ping(&mut self) -> Result<(), Error> {
267 self.flush_pending_rollback().await?;
268
269 if self.stream.is_some() {
270 Ok(())
271 } else {
272 Err(wire_not_implemented())
273 }
274 }
275
276 fn begin(
277 &mut self,
278 ) -> impl std::future::Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_
279 {
280 Transaction::begin(self, None)
281 }
282
283 fn shrink_buffers(&mut self) {}
284
285 async fn flush(&mut self) -> Result<(), Error> {
286 Ok(())
287 }
288
289 fn should_flush(&self) -> bool {
290 false
291 }
292}
293
294impl<'c> Executor<'c> for &'c mut MssqlConnection {
295 type Database = Mssql;
296
297 fn fetch_many<'e, 'q, E>(
298 self,
299 mut query: E,
300 ) -> BoxStream<'e, Result<Either<MssqlQueryResult, MssqlRow>, Error>>
301 where
302 'c: 'e,
303 E: Execute<'q, Self::Database>,
304 'q: 'e,
305 E: 'q,
306 {
307 let arguments = query.take_arguments().map_err(Error::Encode);
308 let sql = query.sql();
309
310 stream::once(async move {
311 let arguments = arguments?;
312 self.run_execute_sql(sql.as_str(), arguments.as_ref()).await
313 })
314 .map(|result| match result {
315 Ok(output) => stream_query_output(output),
316 Err(error) => stream::once(future::ready(Err(error))).boxed(),
317 })
318 .flatten()
319 .boxed()
320 }
321
322 fn fetch_optional<'e, 'q, E>(
323 self,
324 mut query: E,
325 ) -> BoxFuture<'e, Result<Option<MssqlRow>, Error>>
326 where
327 'c: 'e,
328 E: Execute<'q, Self::Database>,
329 'q: 'e,
330 E: 'q,
331 {
332 let arguments = query.take_arguments().map_err(Error::Encode);
333 let sql = query.sql();
334
335 Box::pin(async move {
336 let arguments = arguments?;
337 Ok(self
338 .run_execute_sql(sql.as_str(), arguments.as_ref())
339 .await?
340 .rows
341 .into_iter()
342 .next())
343 })
344 }
345
346 fn prepare_with<'e>(
347 self,
348 sql: sqlx_core::sql_str::SqlStr,
349 parameters: &'e [crate::MssqlTypeInfo],
350 ) -> BoxFuture<'e, Result<MssqlStatement, Error>>
351 where
352 'c: 'e,
353 {
354 Box::pin(async move {
355 let output = self.run_prepare(sql.as_str(), parameters).await?;
356 let parameters = if parameters.is_empty() {
357 None
358 } else {
359 Some(Either::Left(parameters.to_vec()))
360 };
361
362 Ok(MssqlStatement::with_parameters(
363 sql,
364 output.columns,
365 parameters,
366 ))
367 })
368 }
369}
370
371fn first_i32_return_value(output: &QueryOutput) -> Result<Option<i32>, Error> {
372 output
373 .return_values
374 .first()
375 .map(|value| {
376 <i32 as Decode<Mssql>>::decode(value.as_ref()).map_err(|error| Error::ColumnDecode {
377 index: "return value".to_owned(),
378 source: error,
379 })
380 })
381 .transpose()
382}
383
384pub(crate) fn wire_not_implemented() -> Error {
385 Error::Protocol("SQL Server connection stream is not available".to_owned())
386}
387
388struct MssqlWireStream {
389 stream: MssqlStream,
390 packet_size: usize,
391}
392
393impl std::fmt::Debug for MssqlWireStream {
394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395 f.debug_struct("MssqlWireStream")
396 .field("encrypted", &matches!(self.stream, MssqlStream::Tls(_)))
397 .field("packet_size", &self.packet_size)
398 .finish()
399 }
400}
401
402enum MssqlStream {
403 Raw(TcpStream),
404 Tls(tokio_native_tls::TlsStream<TlsPreloginStream<TcpStream>>),
405 Taken,
406}
407
408impl MssqlWireStream {
409 async fn connect(options: &MssqlConnectOptions) -> Result<Self, Error> {
410 let port = match (options.port(), options.instance()) {
411 (Some(port), _) => port,
412 (None, Some(instance)) => ssrp::resolve_instance_port(options.host(), instance).await?,
413 (None, None) => 1433,
414 };
415
416 let stream = TcpStream::connect((options.host(), port)).await?;
417 let packet_size = usize::try_from(options.requested_packet_size()).map_err(|_| {
418 Error::Protocol(format!(
419 "SQL Server packet size {} does not fit usize",
420 options.requested_packet_size()
421 ))
422 })?;
423
424 Ok(Self {
425 stream: MssqlStream::Raw(stream),
426 packet_size,
427 })
428 }
429
430 async fn write_all(&mut self, bytes: &[u8]) -> Result<(), Error> {
431 match &mut self.stream {
432 MssqlStream::Raw(stream) => {
433 write_tds_packets(stream, bytes).await?;
434 }
435 MssqlStream::Tls(stream) => {
436 write_tds_packets(stream, bytes).await?;
437 }
438 MssqlStream::Taken => return Err(taken_stream_error()),
439 }
440 Ok(())
441 }
442
443 async fn shutdown(&mut self) -> Result<(), Error> {
444 match &mut self.stream {
445 MssqlStream::Raw(stream) => stream.shutdown().await?,
446 MssqlStream::Tls(stream) => stream.shutdown().await?,
447 MssqlStream::Taken => return Err(taken_stream_error()),
448 }
449 Ok(())
450 }
451
452 async fn enable_tls(&mut self, options: &MssqlConnectOptions) -> Result<(), Error> {
453 let stream = match std::mem::replace(&mut self.stream, MssqlStream::Taken) {
454 MssqlStream::Raw(stream) => stream,
455 other => {
456 self.stream = other;
457 return Ok(());
458 }
459 };
460
461 let mut stream = TlsPreloginStream::new(stream);
462 stream.start_handshake();
463
464 let domain = options
465 .hostname_in_certificate()
466 .unwrap_or_else(|| options.host());
467 let connector = build_tls_connector(options)?;
468 let mut stream = connector
469 .connect(domain, stream)
470 .await
471 .map_err(|error| {
472 Error::Tls(
473 std::io::Error::other(format!(
474 "SQL Server TLS handshake failed for host `{}` during the TDS PRELOGIN encryption upgrade \
475 (encrypt={:?}, trust_server_certificate={}, hostname_in_certificate={}, ssl_root_cert={}): {}",
476 domain,
477 options.encrypt(),
478 options.trust_server_certificate(),
479 options.hostname_in_certificate().unwrap_or("<not set>"),
480 options.ssl_root_cert().is_some(),
481 error
482 ))
483 .into(),
484 )
485 })?;
486 stream.get_mut().get_mut().get_mut().finish_handshake();
487
488 self.stream = MssqlStream::Tls(stream);
489 Ok(())
490 }
491
492 async fn read_message(&mut self) -> Result<WireMessage, Error> {
493 let mut packet_type = None;
494 let mut expected_packet_id = None;
495 let mut payload = Vec::new();
496
497 loop {
498 let mut header_bytes = [0u8; PACKET_HEADER_LEN];
499 self.read_exact(&mut header_bytes).await?;
500 let header = PacketHeader::decode(&header_bytes).map_err(packet_error)?;
501
502 if let Some(packet_type) = packet_type {
503 if header.packet_type != packet_type {
504 return Err(Error::Protocol(format!(
505 "mismatched SQL Server packet type: expected 0x{:02x}, got 0x{:02x}",
506 packet_type.code(),
507 header.packet_type.code()
508 )));
509 }
510 } else {
511 packet_type = Some(header.packet_type);
512 }
513
514 if let Some(packet_id) = expected_packet_id {
515 if header.packet_id != packet_id {
516 return Err(Error::Protocol(format!(
517 "non-contiguous SQL Server packet id: expected {packet_id}, got {}",
518 header.packet_id
519 )));
520 }
521 }
522
523 let packet_len = usize::from(header.length);
524 if packet_len > self.packet_size {
525 return Err(Error::Protocol(format!(
526 "SQL Server packet length {packet_len} exceeds negotiated packet size {}",
527 self.packet_size
528 )));
529 }
530
531 let payload_len = packet_len.checked_sub(PACKET_HEADER_LEN).ok_or_else(|| {
532 Error::Protocol("SQL Server packet header length underflow".to_owned())
533 })?;
534 let old_len = payload.len();
535 payload.resize(old_len + payload_len, 0);
536 self.read_exact(&mut payload[old_len..]).await?;
537
538 expected_packet_id = Some(header.packet_id.wrapping_add(1));
539
540 if header.status == PacketStatus::END_OF_MESSAGE {
541 return Ok(WireMessage {
542 packet_type: packet_type.expect("packet_type is set after first header"),
543 payload,
544 });
545 }
546 }
547 }
548
549 async fn read_exact(&mut self, bytes: &mut [u8]) -> Result<(), Error> {
550 match &mut self.stream {
551 MssqlStream::Raw(stream) => {
552 stream.read_exact(bytes).await?;
553 }
554 MssqlStream::Tls(stream) => {
555 stream.read_exact(bytes).await?;
556 }
557 MssqlStream::Taken => return Err(taken_stream_error()),
558 }
559
560 Ok(())
561 }
562}
563
564async fn write_tds_packets<S>(stream: &mut S, bytes: &[u8]) -> Result<(), Error>
565where
566 S: AsyncWrite + Unpin,
567{
568 let mut offset = 0usize;
569
570 while offset < bytes.len() {
571 let packet = tds_packet_slice(bytes, offset)?;
572 stream.write_all(packet).await?;
573 stream.flush().await?;
574 offset += packet.len();
575 }
576
577 Ok(())
578}
579
580fn tds_packet_slice(bytes: &[u8], offset: usize) -> Result<&[u8], Error> {
581 let header_end = offset
582 .checked_add(PACKET_HEADER_LEN)
583 .ok_or_else(|| Error::Protocol("SQL Server outbound packet offset overflow".to_owned()))?;
584 let header_bytes = bytes.get(offset..header_end).ok_or_else(|| {
585 Error::Protocol("SQL Server outbound packet buffer ended inside a header".to_owned())
586 })?;
587 let header = PacketHeader::decode(header_bytes).map_err(packet_error)?;
588 let packet_len = usize::from(header.length);
589 let packet_end = offset
590 .checked_add(packet_len)
591 .ok_or_else(|| Error::Protocol("SQL Server outbound packet length overflow".to_owned()))?;
592
593 bytes.get(offset..packet_end).ok_or_else(|| {
594 Error::Protocol("SQL Server outbound packet buffer ended inside a packet".to_owned())
595 })
596}
597
598#[derive(Debug)]
599struct WireMessage {
600 packet_type: PacketType,
601 payload: Vec<u8>,
602}
603
604fn negotiate_encryption(requested: Encrypt, server: Encrypt) -> std::result::Result<bool, Error> {
605 match (requested, server) {
606 (Encrypt::NotSupported, Encrypt::NotSupported | Encrypt::Off) => Ok(false),
607 (Encrypt::NotSupported, Encrypt::On | Encrypt::Required) => Err(Error::Protocol(
608 "SQL Server requires encryption, but the client URL requested encrypt=not_supported"
609 .to_owned(),
610 )),
611 (Encrypt::Required, Encrypt::Off | Encrypt::NotSupported) => Err(Error::Tls(
612 "SQL Server TLS encryption is required but not supported by the server".into(),
613 )),
614 (Encrypt::On | Encrypt::Required, Encrypt::On | Encrypt::Required) => Ok(true),
615 (Encrypt::Off, _) | (_, Encrypt::Off) => Err(Error::Protocol(
616 "SQL Server login-only TLS fallback is not implemented yet; use encrypt=mandatory or encrypt=strict for encrypted connections, or encrypt=not_supported for plaintext development servers"
617 .to_owned(),
618 )),
619 (Encrypt::On, Encrypt::NotSupported) => Ok(false),
620 }
621}
622
623fn build_tls_connector(options: &MssqlConnectOptions) -> Result<TlsConnector, Error> {
624 let mut builder = native_tls::TlsConnector::builder();
625 builder.danger_accept_invalid_certs(options.trust_server_certificate());
626 builder.danger_accept_invalid_hostnames(options.hostname_in_certificate().is_none());
627
628 if let Some(path) = options.ssl_root_cert() {
629 let cert = std::fs::read(path).map_err(Error::Io)?;
630 let cert = Certificate::from_pem(&cert)
631 .or_else(|_| Certificate::from_der(&cert))
632 .map_err(|error| Error::Tls(error.into()))?;
633 builder.add_root_certificate(cert);
634 }
635
636 builder
637 .build()
638 .map(TlsConnector::from)
639 .map_err(|error| Error::Tls(error.into()))
640}
641
642fn taken_stream_error() -> Error {
643 Error::Protocol("SQL Server stream was used while TLS upgrade was in progress".to_owned())
644}
645
646fn packet_error(error: crate::protocol::packet::PacketHeaderError) -> Error {
647 Error::Protocol(error.to_string())
648}
649
650fn pre_login_error(error: PreLoginError) -> Error {
651 Error::Protocol(error.to_string())
652}
653
654fn login_error(error: Login7Error) -> Error {
655 Error::Protocol(error.to_string())
656}
657
658fn token_error(error: TokenParseError) -> Error {
659 Error::Protocol(error.to_string())
660}
661
662fn frame_error(error: crate::protocol::packet::PacketFrameError) -> Error {
663 Error::Protocol(error.to_string())
664}
665
666fn stream_query_output(
667 output: QueryOutput,
668) -> BoxStream<'static, Result<Either<MssqlQueryResult, MssqlRow>, Error>> {
669 stream::iter(
670 output
671 .rows
672 .into_iter()
673 .map(|row| Ok(Either::Right(row)))
674 .chain(std::iter::once(Ok(Either::Left(output.result)))),
675 )
676 .boxed()
677}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682
683 #[test]
684 fn negotiates_full_tls_for_required_or_mandatory_encryption() {
685 assert!(negotiate_encryption(Encrypt::On, Encrypt::On).unwrap());
686 assert!(negotiate_encryption(Encrypt::Required, Encrypt::Required).unwrap());
687 }
688
689 #[test]
690 fn allows_plaintext_only_when_explicitly_requested_and_supported() {
691 assert!(!negotiate_encryption(Encrypt::NotSupported, Encrypt::Off).unwrap());
692 assert!(negotiate_encryption(Encrypt::NotSupported, Encrypt::Required).is_err());
693 }
694
695 #[test]
696 fn rejects_login_only_tls_fallback_until_downgrade_is_available() {
697 assert!(negotiate_encryption(Encrypt::Off, Encrypt::On).is_err());
698 assert!(negotiate_encryption(Encrypt::On, Encrypt::Off).is_err());
699 }
700
701 #[test]
702 fn slices_encoded_outbound_packets_by_header_length() {
703 let bytes = crate::protocol::packet::encode_message(PacketType::RPC, &[0; 11], 12).unwrap();
704
705 let first = tds_packet_slice(&bytes, 0).unwrap();
706 assert_eq!(12, first.len());
707
708 let second = tds_packet_slice(&bytes, first.len()).unwrap();
709 assert_eq!(12, second.len());
710
711 let third = tds_packet_slice(&bytes, first.len() + second.len()).unwrap();
712 assert_eq!(11, third.len());
713 }
714
715 #[test]
716 fn rejects_truncated_outbound_packet() {
717 let bytes = crate::protocol::packet::encode_message(PacketType::RPC, &[0; 11], 12).unwrap();
718 let err = tds_packet_slice(&bytes[..bytes.len() - 1], 24).unwrap_err();
719
720 assert!(err.to_string().contains("ended inside a packet"));
721 }
722}