1use core::task::Waker;
7
8use super::{
9 AsyncStatus, Display, DisplayBase, ExtensionMap, Poisonable, Prefetch, RawReply, RawRequest,
10 X11Core,
11};
12use crate::{connection::Connection, Error, InvalidState, Result, ResultExt};
13
14use alloc::{sync::Arc, vec, vec::Vec};
15use x11rb_protocol::{
16 connect::Connect,
17 id_allocator::IdsExhausted,
18 packet_reader::PacketReader,
19 protocol::{
20 bigreq::EnableRequest,
21 xc_misc::GetXIDRangeRequest,
22 xproto::{GetInputFocusRequest, QueryExtensionRequest, Setup},
23 Event,
24 },
25 x11_utils::ExtensionInformation,
26};
27
28use impl_details::{BlockingStrategy, PollingStrategy, Strategy};
29
30cfg_std! {
31 use crate::{connection::BufConnection, NameConnection};
32 use core::mem;
33 use x11rb_protocol::{parse_display, xauth};
34}
35
36cfg_async! {
37 use super::CanBeAsyncDisplay;
38 use core::task::Context;
39 use impl_details::NonBlockingStrategy;
40}
41
42cfg_std_unix! {
43 use std::os::unix::io::{AsRawFd, RawFd};
44}
45
46cfg_std_windows! {
47 use std::os::windows::io::{AsRawSocket, RawSocket};
48}
49
50pub struct BasicDisplay<Conn> {
53 core: X11Core,
55 pub(crate) setup: Arc<Setup>,
57 packet_reader: PacketReader,
59 conn: Poisonable<Conn>,
61 max_request_size: Option<Prefetch<EnableRequest>>,
63 pub(crate) default_screen_index: usize,
65 extension_map: ExtensionMap,
67 async_state: AsyncState,
69}
70
71#[derive(Default)]
72struct AsyncState {
73 #[allow(dead_code)]
75 current_sending: Option<u64>,
76 xid_regeneration: Option<Prefetch<GetXIDRangeRequest>>,
78 synchronization: Option<Prefetch<GetInputFocusRequest>>,
80}
81
82cfg_std! {
83 pub type DisplayConnection = BasicDisplay<BufConnection<NameConnection>>;
90
91 impl DisplayConnection {
92 pub fn connect(display: Option<&str>) -> Result<Self> {
94 let span = tracing::info_span!("connect");
95 let enter = span.enter();
96
97 crate::initialization(|| {
98 let dpy = parse_display::parse_display(display)
100 .ok_or_else(|| Error::couldnt_parse_display(display.is_none()))?;
101
102 tracing::trace!(display = ?dpy);
103
104 let screen = dpy.screen;
105 let display_num = dpy.display;
106 let conn = NameConnection::from_parsed_display(&dpy, display.is_none())?;
107
108 let (family, address) = conn.get_address()?;
110
111 let (name, data) = match xauth::get_auth(family, &address, display_num).map_err(Error::io)? {
112 Some(tuple) => tuple,
113 None => {
114 tracing::warn!("no Xauth entry found for display {}", display_num);
115
116 (vec![], vec![])
117 },
118 };
119
120 mem::drop(enter);
121
122 Self::connect_with_auth(conn.into(), screen.into(), name, data)
123 })
124 }
125 }
126}
127
128impl<Conn: Connection> BasicDisplay<Conn> {
129 pub fn with_connection(conn: Conn, setup: Setup, default_screen_index: usize) -> Result<Self> {
132 crate::initialization(move || {
133 let core = X11Core::from_setup(&setup)?;
134 let default_screen_index = default_screen_index;
135
136 Ok(Self {
137 core,
138 setup: Arc::new(setup),
139 packet_reader: PacketReader::new(),
140 conn: Poisonable::from(conn),
141 max_request_size: Some(Prefetch::default()),
142 default_screen_index,
143 extension_map: ExtensionMap::default(),
144 async_state: AsyncState::default(),
145 })
146 })
147 }
148
149 pub fn connect_with_auth(
158 mut conn: Conn,
159 default_screen_index: usize,
160 auth_name: Vec<u8>,
161 auth_info: Vec<u8>,
162 ) -> Result<Self> {
163 let span = tracing::info_span!("connect_with_auth");
164 let _enter = span.enter();
165
166 crate::initialization(move || {
167 let (mut connect, setup_request) = Connect::with_authorization(auth_name, auth_info);
169
170 tracing::debug!("writing the setup request to server");
172 let mut nwritten = 0;
173 while nwritten < setup_request.len() {
174 let n = conn.send_slice(&setup_request[nwritten..])?;
175 nwritten += n;
176
177 tracing::trace!(written = n, total = nwritten, "wrote bytes for setup",);
178 }
179
180 conn.flush()?;
181
182 tracing::debug!("reading the setup from server");
184 loop {
185 let adv = conn.recv_slice(connect.buffer())?;
186 if adv == 0 {
187 return Err(Error::make_invalid_state(InvalidState::NotEnoughSetup));
188 }
189
190 tracing::trace!(read = adv, "read bytes for setup");
191
192 if connect.advance(adv) {
193 break;
195 }
196 }
197
198 let setup = connect.into_setup().map_err(Error::make_connect_error)?;
200
201 let display = Self::with_connection(conn, setup, default_screen_index)?;
203 Ok(display)
204 })
205 }
206
207 fn wait(&mut self, strategy: &mut impl Strategy<Conn>) -> Result<AsyncStatus<()>> {
209 let span = tracing::debug_span!("wait", strategy = strategy.description());
210 let _enter = span.enter();
211
212 let mut fds = vec![];
214 let packet_reader = &mut self.packet_reader;
215 let amt = match self
216 .conn
217 .with(|conn| strategy.read_slices(conn, packet_reader.buffer(), &mut fds))
218 {
219 Ok(amt) => amt,
220 Err(e) if e.would_block() => return Ok(AsyncStatus::Read),
221 Err(e) => return Err(e),
222 };
223
224 self.core.enqueue_fds(fds);
228 if let Some(packet) = self.packet_reader.advance(amt) {
229 self.core.enqueue_packet(packet);
230 }
231
232 Ok(AsyncStatus::Ready(()))
234 }
235
236 fn fetch_reply(
239 &mut self,
240 seq: u64,
241 strategy: &mut impl Strategy<Conn>,
242 ) -> Result<AsyncStatus<RawReply>> {
243 let span = tracing::trace_span!("fetch_reply", seq = seq);
244 let _enter = span.enter();
245
246 mtry!(self.partial_flush());
248
249 loop {
250 if let Some(reply) = self.core.fetch_reply(seq, &self.extension_map)? {
252 return Ok(AsyncStatus::Ready(reply));
253 }
254
255 mtry!(self.wait(strategy));
257 }
258 }
259
260 fn fetch_event(&mut self, strategy: &mut impl Strategy<Conn>) -> Result<AsyncStatus<Event>> {
262 mtry!(self.partial_flush());
263
264 loop {
265 if let Some(event) = self.core.fetch_event(&self.extension_map)? {
267 return Ok(AsyncStatus::Ready(event));
268 }
269
270 mtry!(self.wait(strategy));
272 }
273 }
274
275 fn prefetch_maximum_length(
276 &mut self,
277 ctx: Option<&Waker>,
278 strategy: &mut impl Strategy<Conn>,
279 ) -> Result<AsyncStatus<(bool, usize)>> {
280 tracing::info!("prefetching maximum length from server");
281
282 let mut prefetch = self.max_request_size.take().unwrap();
284
285 let sz = strategy.prefetch(self, &mut prefetch, ctx).acopied();
287 self.max_request_size = Some(prefetch);
288
289 if self
290 .max_request_size
291 .as_ref()
292 .unwrap()
293 .get_if_resolved()
294 .is_some()
295 {
296 tracing::trace!("Finished bigreq setup");
297 } else {
298 tracing::debug!("bigreq incomplete: {:?}", &sz);
299 }
300
301 let sz = mtry!(sz);
302 Ok(AsyncStatus::Ready((
303 sz.is_some(),
304 sz.unwrap_or(self.setup.maximum_request_length as usize),
305 )))
306 }
307
308 fn bigreq(
309 &mut self,
310 ctx: Option<&Waker>,
311 strategy: &mut impl Strategy<Conn>,
312 ) -> Result<AsyncStatus<(bool, usize)>> {
313 let span = tracing::debug_span!("bigreq");
314 let _enter = span.enter();
315
316 loop {
317 match self
318 .max_request_size
319 .as_ref()
320 .map(|mrs| mrs.get_if_resolved().copied())
321 {
322 None => {
323 return Ok(AsyncStatus::Ready((
325 false,
326 self.setup.maximum_request_length as usize,
327 )));
328 }
329 Some(None) => {
330 mtry!(self.prefetch_maximum_length(ctx, strategy));
332 }
333 Some(Some(sz)) => {
334 return Ok(AsyncStatus::Ready((
336 sz.is_some(),
337 sz.unwrap_or(self.setup.maximum_request_length as usize),
338 )));
339 }
340 }
341 }
342 }
343
344 fn prefetch_extension(
345 &mut self,
346 name: &'static str,
347 ctx: Option<&Waker>,
348 strategy: &mut impl Strategy<Conn>,
349 ) -> Result<AsyncStatus<Option<ExtensionInformation>>> {
350 tracing::info!("prefetching extension {} from server", name);
351
352 let mut pf = match self.extension_map.take_pf(name) {
353 Some(pf) => pf,
354 None => Prefetch::new(QueryExtensionRequest {
355 name: name.as_bytes().into(),
356 }),
357 };
358
359 let res = strategy.prefetch(self, &mut pf, ctx).acopied();
361
362 self.extension_map.insert(name, pf);
364
365 res
366 }
367
368 fn extension_info(
369 &mut self,
370 name: &'static str,
371 ctx: Option<&Waker>,
372 strategy: &mut impl Strategy<Conn>,
373 ) -> Result<AsyncStatus<ExtensionInformation>> {
374 let span = tracing::debug_span!("extension_info");
375 let _enter = span.enter();
376
377 loop {
378 match self.extension_map.get(name) {
379 Some(Some(info)) => return Ok(AsyncStatus::Ready(info)),
380 Some(None) => return Err(Error::make_missing_extension(name)),
381 None => {
382 mtry!(self.prefetch_extension(name, ctx, strategy));
384 }
385 }
386 }
387 }
388
389 fn partial_flush(&mut self) -> Result<AsyncStatus<()>> {
390 tracing::trace!("flushing connection");
391
392 match self.conn.with(Connection::flush) {
393 Ok(()) => Ok(AsyncStatus::Ready(())),
394 Err(e) if e.would_block() => Ok(AsyncStatus::Write),
395 Err(e) => Err(e),
396 }
397 }
398
399 fn partial_synchronize(
401 &mut self,
402 ctx: Option<&Waker>,
403 strategy: &mut impl Strategy<Conn>,
404 ) -> Result<AsyncStatus<()>> {
405 tracing::debug!("trying for partial synchronization");
406
407 let mut pf = self.async_state.synchronization.take().unwrap_or_default();
408 let res = strategy.prefetch(self, &mut pf, ctx).acopied();
409
410 if !matches!(res.as_ref().map(AsyncStatus::is_ready), Ok(true) | Err(_)) {
411 self.async_state.synchronization = Some(pf);
412 }
413
414 tracing::trace!("finished partial synchronization");
415
416 res.map(|a| a.map(|_| ()))
417 }
418
419 fn try_format_request(
421 &mut self,
422 request: &mut RawRequest<'_, '_>,
423 ctx: Option<&Waker>,
424 strategy: &mut impl Strategy<Conn>,
425 ) -> Result<AsyncStatus<u64>> {
426 let span = tracing::debug_span!("format_request", strategy = strategy.description());
427 let _enter = span.enter();
428
429 let (is_bigreq, max_len) = mtry!(self.bigreq(ctx, strategy));
431 let extension = request.extension();
432
433 let extension_opcode = match extension {
434 Some(ext) => Some(mtry!(self.extension_info(ext, ctx, strategy)).major_opcode),
435 None => None,
436 };
437
438 let seq = loop {
440 match self.core.send_request(request.variant()) {
441 Some(seq) => break seq,
442 None => {
443 mtry!(self.partial_synchronize(ctx, strategy));
445 }
446 }
447 };
448
449 tracing::debug!(
450 seq = seq,
451 is_bigreq = is_bigreq,
452 extension_opcode = extension_opcode,
453 "formatting request for sending",
454 );
455
456 request.format(extension_opcode, max_len)?;
458
459 if let Some(mode) = request.discard_mode() {
461 self.core.discard_reply(seq, mode);
462 }
463
464 Ok(AsyncStatus::Ready(seq))
465 }
466
467 fn try_send_raw_request(&mut self, req: &mut RawRequest<'_, '_>) -> Result<AsyncStatus<()>> {
469 loop {
470 if req.is_empty() {
471 break;
472 }
473
474 let (buf, fds) = req.mut_parts();
475
476 match self.conn.with(|conn| conn.send_slices_and_fds(&**buf, fds)) {
477 Ok(nwritten) => {
478 tracing::trace!(nwritten = nwritten, "sent data to server");
479 req.advance(nwritten);
480 }
481 Err(e) if e.would_block() => {
482 return Ok(AsyncStatus::Write);
483 }
484 Err(e) => return Err(e),
485 }
486 }
487
488 Ok(AsyncStatus::Ready(()))
489 }
490
491 fn partial_error_check(
493 &mut self,
494 seq: u64,
495 ctx: Option<&Waker>,
496 strategy: &mut impl Strategy<Conn>,
497 ) -> Result<AsyncStatus<()>> {
498 while self.core.ready_for_error_check(seq) {
500 tracing::debug!("synchronizing until we are ready to check for an error");
501 mtry!(self.partial_synchronize(ctx, strategy));
502 }
503
504 loop {
506 if self.core.check_for_error(seq, &self.extension_map)? {
507 return Ok(AsyncStatus::Ready(()));
509 }
510
511 tracing::debug!("reading packets to try to get an error");
513 mtry!(self.wait(strategy));
514 }
515 }
516
517 fn partial_generate_xid(
519 &mut self,
520 ctx: Option<&Waker>,
521 strategy: &mut impl Strategy<Conn>,
522 ) -> Result<AsyncStatus<u32>> {
523 loop {
524 if let Some(id) = self.core.generate_xid() {
525 return Ok(AsyncStatus::Ready(id));
527 }
528
529 let mut pf = self.async_state.xid_regeneration.take().unwrap_or_default();
531 let res = strategy.prefetch(self, &mut pf, ctx).acopied();
532
533 if !matches!(res.as_ref().map(AsyncStatus::is_ready), Ok(true) | Err(_)) {
534 self.async_state.xid_regeneration = Some(pf);
535 }
536
537 let range = mtry!(res);
539 self.core
540 .update_xid_range(range)
541 .map_err(|IdsExhausted| Error::make_invalid_state(InvalidState::XidsExhausted))?;
542 }
543 }
544}
545
546cfg_std_unix! {
547 impl<Conn: AsRawFd> AsRawFd for BasicDisplay<Conn> {
548 fn as_raw_fd(&self) -> RawFd {
549 self.conn.with_ref(|conn| {
550 Ok(conn.as_raw_fd())
551 }).expect("`AsRawFd` impl failed because connection is poisoned")
552 }
553 }
554}
555
556cfg_std_windows! {
557 impl<Conn: AsRawSocket> AsRawSocket for BasicDisplay<Conn> {
558 fn as_raw_socket(&self) -> RawSocket {
559 self.conn.with_ref(|conn| {
560 Ok(conn.as_raw_socket())
561 }).expect("`AsRawSocket` impl failed because connection is poisoned")
562 }
563 }
564}
565
566impl<Conn: Connection> DisplayBase for BasicDisplay<Conn> {
567 fn setup(&self) -> &Arc<Setup> {
568 &self.setup
569 }
570
571 fn default_screen_index(&self) -> usize {
572 self.default_screen_index
573 }
574
575 fn poll_for_reply_raw(&mut self, seq: u64) -> Result<Option<RawReply>> {
576 self.fetch_reply(seq, &mut PollingStrategy)
577 .map(AsyncStatus::ready)
578 }
579
580 fn poll_for_event(&mut self) -> Result<Option<Event>> {
581 self.fetch_event(&mut PollingStrategy)
582 .map(AsyncStatus::ready)
583 }
584}
585
586impl<Conn: Connection> Display for BasicDisplay<Conn> {
587 fn send_request_raw(&mut self, mut req: RawRequest<'_, '_>) -> Result<u64> {
588 let span = tracing::debug_span!("send_request_raw");
589 let _enter = span.enter();
590
591 cfg_if::cfg_if! {
592 if #[cfg(feature = "async")] {
593 if let Some(seq) = self.async_state.current_sending {
595 return Err(Error::async_send_in_progress(seq))
596 }
597 }
598 }
599
600 let sequence = self
601 .try_format_request(&mut req, None, &mut BlockingStrategy)?
602 .unwrap();
603 self.try_send_raw_request(&mut req)?.unwrap();
604 Ok(sequence)
605 }
606
607 fn wait_for_reply_raw(&mut self, seq: u64) -> Result<RawReply> {
608 self.fetch_reply(seq, &mut BlockingStrategy)
609 .map(AsyncStatus::unwrap)
610 }
611
612 fn wait_for_event(&mut self) -> Result<Event> {
613 self.fetch_event(&mut BlockingStrategy)
614 .map(AsyncStatus::unwrap)
615 }
616
617 fn generate_xid(&mut self) -> Result<u32> {
618 self.partial_generate_xid(None, &mut BlockingStrategy)
619 .map(AsyncStatus::unwrap)
620 }
621
622 fn maximum_request_length(&mut self) -> Result<usize> {
623 let span = tracing::debug_span!("maximum_request_length");
624 let _enter = span.enter();
625
626 let (_, max_len) = self.bigreq(None, &mut BlockingStrategy)?.unwrap();
627 Ok(max_len)
628 }
629
630 fn flush(&mut self) -> Result<()> {
631 self.conn.with(Connection::flush)
633 }
634
635 fn check_for_error(&mut self, seq: u64) -> Result<()> {
636 self.partial_error_check(seq, None, &mut BlockingStrategy)
637 .map(AsyncStatus::unwrap)
638 }
639}
640
641cfg_async! {
642 impl<Conn: Connection> CanBeAsyncDisplay for BasicDisplay<Conn> {
643 fn format_request(
644 &mut self,
645 req: &mut RawRequest<'_, '_>,
646 ctx: &mut Context<'_>,
647 ) -> Result<AsyncStatus<u64>> {
648 self.try_format_request(req, Some(ctx.waker()), &mut NonBlockingStrategy)
649 }
650
651 fn try_send_request_raw(
652 &mut self,
653 req: &mut RawRequest<'_, '_>,
654 _ctx: &mut Context<'_>,
655 ) -> Result<AsyncStatus<()>> {
656 self.try_send_raw_request(req)
657 }
658
659 fn try_flush(&mut self, _ctx: &mut Context<'_>) -> Result<AsyncStatus<()>> {
660 self.partial_flush()
661 }
662
663 fn try_generate_xid(&mut self, ctx: &mut Context<'_>) -> Result<AsyncStatus<u32>> {
664 self.partial_generate_xid(Some(ctx.waker()), &mut NonBlockingStrategy)
665 }
666
667 fn try_maximum_request_length(&mut self, ctx: &mut Context<'_>) -> Result<AsyncStatus<usize>> {
668 let (_, max) = mtry!(self.bigreq(Some(ctx.waker()), &mut NonBlockingStrategy));
669 Ok(AsyncStatus::Ready(max))
670 }
671
672 fn try_wait_for_event(&mut self, _ctx: &mut Context<'_>) -> Result<AsyncStatus<Event>> {
673 self.fetch_event(&mut NonBlockingStrategy)
674 }
675
676 fn try_wait_for_reply_raw(
677 &mut self,
678 seq: u64,
679 _ctx: &mut Context<'_>,
680 ) -> Result<AsyncStatus<RawReply>> {
681 self.fetch_reply(seq, &mut NonBlockingStrategy)
682 }
683
684 fn try_check_for_error(
685 &mut self,
686 seq: u64,
687 ctx: &mut Context<'_>,
688 ) -> Result<AsyncStatus<()>> {
689 self.partial_error_check(seq, Some(ctx.waker()), &mut NonBlockingStrategy)
690 }
691 }
692}
693
694mod impl_details {
695 use alloc::vec::Vec;
696 use core::task::Waker;
697
698 cfg_async! {
699 use core::task::Context;
700 }
701
702 use crate::{
703 connection::Connection,
704 display::{prefetch::PrefetchTarget, AsyncStatus, Prefetch},
705 Fd, Result,
706 };
707
708 use super::BasicDisplay;
709
710 pub(crate) trait Strategy<Conn> {
713 fn read_slices(
715 &mut self,
716 conn: &mut Conn,
717 slice: &mut [u8],
718 fds: &mut Vec<Fd>,
719 ) -> Result<usize>;
720
721 fn prefetch<'p, P: PrefetchTarget>(
723 &mut self,
724 display: &mut BasicDisplay<Conn>,
725 prefetch: &'p mut Prefetch<P>,
726 ctx: Option<&Waker>,
727 ) -> Result<AsyncStatus<&'p P::Target>>;
728
729 fn description(&self) -> &'static str;
731 }
732
733 pub(crate) struct BlockingStrategy;
735
736 impl<Conn: Connection> Strategy<Conn> for BlockingStrategy {
737 fn read_slices(
738 &mut self,
739 conn: &mut Conn,
740 slice: &mut [u8],
741 fds: &mut Vec<Fd>,
742 ) -> Result<usize> {
743 conn.recv_slice_and_fds(slice, fds)
744 }
745
746 fn prefetch<'p, P: PrefetchTarget>(
747 &mut self,
748 display: &mut BasicDisplay<Conn>,
749 prefetch: &'p mut Prefetch<P>,
750 _ctx: Option<&Waker>,
751 ) -> Result<AsyncStatus<&'p P::Target>> {
752 prefetch.evaluate(display).map(AsyncStatus::Ready)
753 }
754
755 fn description(&self) -> &'static str {
756 "blocking"
757 }
758 }
759
760 pub(crate) struct PollingStrategy;
762
763 impl<Conn: Connection> Strategy<Conn> for PollingStrategy {
764 fn read_slices(
765 &mut self,
766 conn: &mut Conn,
767 slice: &mut [u8],
768 fds: &mut Vec<Fd>,
769 ) -> Result<usize> {
770 conn.non_blocking_recv_slice_and_fds(slice, fds)
771 }
772
773 fn prefetch<'p, P: PrefetchTarget>(
774 &mut self,
775 _display: &mut BasicDisplay<Conn>,
776 _prefetch: &'p mut Prefetch<P>,
777 _ctx: Option<&Waker>,
778 ) -> Result<AsyncStatus<&'p P::Target>> {
779 unreachable!()
780 }
781
782 fn description(&self) -> &'static str {
783 "polling"
784 }
785 }
786
787 cfg_async! {
788 pub(crate) struct NonBlockingStrategy;
790
791 impl<Conn: Connection> Strategy<Conn> for NonBlockingStrategy {
792 fn read_slices(
793 &mut self,
794 conn: &mut Conn,
795 slice: &mut [u8],
796 fds: &mut Vec<Fd>,
797 ) -> Result<usize> {
798 conn.non_blocking_recv_slice_and_fds(slice, fds)
799 }
800
801 fn prefetch<'p, P: PrefetchTarget>(
802 &mut self,
803 display: &mut BasicDisplay<Conn>,
804 prefetch: &'p mut Prefetch<P>,
805 ctx: Option<&Waker>,
806 ) -> Result<AsyncStatus<&'p P::Target>> {
807 let mut ctx = Context::from_waker(ctx.unwrap());
808 prefetch.try_evaluate(display, &mut ctx)
809 }
810
811 fn description(&self) -> &'static str {
812 "non-blocking"
813 }
814 }
815 }
816}