1use std::{cell::Cell, cmp, fmt, future::poll_fn, mem, ops, rc::Rc, task::Context, task::Poll};
2
3use ntex_bytes::Bytes;
4use ntex_error::Error;
5use ntex_http::{HeaderMap, StatusCode, header::CONTENT_LENGTH};
6use ntex_util::task::LocalWaker;
7
8use crate::error::{OperationError, StreamError};
9use crate::frame::{
10 Data, Headers, PseudoHeaders, Reason, Reset, StreamId, WindowSize, WindowUpdate,
11};
12use crate::{connection::Connection, frame, message::Message, window::Window};
13
14pub struct Stream(StreamRef);
16
17#[derive(Debug)]
19pub struct Capacity {
20 size: Cell<u32>,
21 stream: Rc<StreamState>,
22}
23
24impl Capacity {
25 fn new(size: u32, stream: &Rc<StreamState>) -> Self {
26 stream.add_recv_capacity(size);
27
28 Self {
29 size: Cell::new(size),
30 stream: stream.clone(),
31 }
32 }
33
34 #[inline]
35 pub fn size(&self) -> usize {
37 self.size.get() as usize
38 }
39
40 pub fn consume(&self, sz: u32) {
46 let size = self.size.get();
47 if let Some(sz) = size.checked_sub(sz) {
48 log::trace!(
49 "{}: {:?} capacity consumed from {} to {}",
50 self.stream.tag(),
51 self.stream.id,
52 size,
53 sz
54 );
55 self.size.set(sz);
56 self.stream.consume_capacity(size - sz);
57 } else {
58 panic!("Capacity overflow");
59 }
60 }
61}
62
63impl ops::Add for Capacity {
65 type Output = Self;
66
67 fn add(self, other: Self) -> Self {
68 if Rc::ptr_eq(&self.stream, &other.stream) {
69 let size = Cell::new(self.size.get() + other.size.get());
70 self.size.set(0);
71 other.size.set(0);
72 Self {
73 size,
74 stream: self.stream.clone(),
75 }
76 } else {
77 panic!("Cannot add capacity from different streams");
78 }
79 }
80}
81
82impl ops::AddAssign for Capacity {
84 fn add_assign(&mut self, other: Self) {
85 if Rc::ptr_eq(&self.stream, &other.stream) {
86 let size = self.size.get() + other.size.get();
87 self.size.set(size);
88 other.size.set(0);
89 } else {
90 panic!("Cannot add capacity from different streams");
91 }
92 }
93}
94
95impl Drop for Capacity {
96 fn drop(&mut self) {
97 let size = self.size.get();
98 if size > 0 {
99 self.stream.consume_capacity(size);
100 }
101 }
102}
103
104#[derive(Debug, Copy, Clone, PartialEq, Eq)]
106pub(super) enum ContentLength {
107 Omitted,
108 Head,
109 Remaining(u64),
110}
111
112#[derive(Clone, Debug)]
113pub struct StreamRef(pub(crate) Rc<StreamState>);
114
115bitflags::bitflags! {
116 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
117 struct StreamFlags: u8 {
118 const REMOTE = 0b0000_0001;
119 const FAILED = 0b0000_0010;
120 const DISCONNECT_ON_DROP = 0b0000_0100;
121 const WAIT_FOR_CAPACITY = 0b0000_1000;
122 }
123}
124
125pub(crate) struct StreamState {
126 id: StreamId,
128 flags: Cell<StreamFlags>,
129 content_length: Cell<ContentLength>,
130 recv: Cell<HalfState>,
132 recv_window: Cell<Window>,
133 recv_size: Cell<u32>,
134 send: Cell<HalfState>,
136 send_window: Cell<Window>,
137 send_cap: LocalWaker,
138 send_reset: LocalWaker,
139 pub(crate) con: Connection,
141 error: Cell<Option<Error<OperationError>>>,
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub(crate) enum HalfState {
147 Idle,
148 Payload,
149 Closed(Option<Reason>),
150}
151
152impl HalfState {
153 pub(crate) fn is_closed(self) -> bool {
154 matches!(self, HalfState::Closed(_))
155 }
156}
157
158impl StreamState {
159 fn tag(&self) -> &'static str {
160 self.con.tag()
161 }
162
163 fn state_send_payload(&self) {
164 self.send.set(HalfState::Payload);
165 }
166
167 fn state_send_close(&self, reason: Option<Reason>) {
168 log::trace!(
169 "{}: {:?} send side is closed with reason {:?}",
170 self.tag(),
171 self.id,
172 reason
173 );
174 self.send.set(HalfState::Closed(reason));
175 self.send_cap.wake();
176 self.review_state();
177 }
178
179 fn state_recv_payload(&self) {
180 self.recv.set(HalfState::Payload);
181 }
182
183 fn state_recv_close(&self, reason: Option<Reason>) {
184 log::trace!("{}: {:?} receive side is closed", self.tag(), self.id);
185 self.recv.set(HalfState::Closed(reason));
186 self.review_state();
187 }
188
189 fn reset_stream(&self, reason: Option<Reason>) {
190 self.recv.set(HalfState::Closed(reason));
191 self.send.set(HalfState::Closed(None));
192 if let Some(reason) = reason {
193 self.error.set(Some(Error::new(
194 OperationError::LocalReset(reason),
195 self.con.service(),
196 )));
197 }
198 self.review_state();
199 }
200
201 fn remote_reset_stream(&self, reason: Reason) {
202 self.recv.set(HalfState::Closed(None));
203 self.send.set(HalfState::Closed(Some(reason)));
204 self.error.set(Some(Error::new(
205 OperationError::RemoteReset(reason),
206 self.con.service(),
207 )));
208 self.review_state();
209 }
210
211 fn failed(&self, err: Error<OperationError>) {
212 if !self.recv.get().is_closed() {
213 self.recv.set(HalfState::Closed(None));
214 }
215 if !self.send.get().is_closed() {
216 self.send.set(HalfState::Closed(None));
217 }
218 self.error.set(Some(err));
219 self.insert_flag(StreamFlags::FAILED);
220 self.review_state();
221 }
222
223 fn insert_flag(&self, f: StreamFlags) {
224 let mut flags = self.flags.get();
225 flags.insert(f);
226 self.flags.set(flags);
227 }
228
229 fn remove_flag(&self, f: StreamFlags) {
230 let mut flags = self.flags.get();
231 flags.remove(f);
232 self.flags.set(flags);
233 }
234
235 fn check_error(&self) -> Result<(), Error<OperationError>> {
236 if let Some(err) = self.error.take() {
237 self.error.set(Some(err.clone()));
238 Err(err)
239 } else {
240 Ok(())
241 }
242 }
243
244 fn review_state(&self) {
245 if self.recv.get().is_closed() {
246 self.send_reset.wake();
247
248 if let HalfState::Closed(reason) = self.send.get() {
249 if let Some(reason) = reason {
251 log::trace!(
252 "{}: {:?} is closed with remote reset {:?}, dropping stream",
253 self.tag(),
254 self.id,
255 reason
256 );
257 } else {
258 log::trace!(
259 "{}: {:?} both sides are closed, dropping stream",
260 self.tag(),
261 self.id
262 );
263 }
264 self.send_cap.wake();
265 self.con.drop_stream(self.id);
266 }
267 }
268 }
269
270 fn add_recv_capacity(&self, size: u32) {
272 let cap = self.recv_size.get();
273 self.recv_size.set(cap + size);
274 self.recv_window.set(self.recv_window.get().dec(size));
275 log::trace!(
276 "{}: {:?} capacity incresed from {} to {}",
277 self.tag(),
278 self.id,
279 cap,
280 cap + size
281 );
282
283 self.con.add_recv_capacity(size);
285 }
286
287 fn consume_capacity(&self, size: u32) {
289 let cap = self.recv_size.get();
290 let size = cap - size;
291 log::trace!(
292 "{}: {:?} capacity decresed from {} to {}",
293 self.tag(),
294 self.id,
295 cap,
296 size
297 );
298
299 self.recv_size.set(size);
300 let mut window = self.recv_window.get();
301 if let Some(val) = window.update(
302 size,
303 self.con.config().window_sz,
304 self.con.config().window_sz_threshold,
305 ) {
306 log::trace!(
307 "{}: {:?} capacity decresed below threshold {} increase by {} ({})",
308 self.tag(),
309 self.id,
310 self.con.config().window_sz_threshold,
311 val,
312 self.con.config().window_sz,
313 );
314 self.recv_window.set(window);
315 self.con.encode(WindowUpdate::new(self.id, val));
316 }
317 }
318}
319
320impl StreamRef {
321 pub(crate) fn new(id: StreamId, remote: bool, con: Connection) -> Self {
322 let recv_window = if con.settings_processed() {
325 Window::new(con.config().window_sz)
326 } else {
327 Window::new(frame::DEFAULT_INITIAL_WINDOW_SIZE)
328 };
329 let send_window = Window::new(con.remote_window_size());
330
331 StreamRef(Rc::new(StreamState {
332 id,
333 con,
334 recv: Cell::new(HalfState::Idle),
335 recv_window: Cell::new(recv_window),
336 recv_size: Cell::new(0),
337 send: Cell::new(HalfState::Idle),
338 send_window: Cell::new(send_window),
339 send_cap: LocalWaker::new(),
340 send_reset: LocalWaker::new(),
341 error: Cell::new(None),
342 content_length: Cell::new(ContentLength::Omitted),
343 flags: Cell::new(if remote {
344 StreamFlags::REMOTE
345 } else {
346 StreamFlags::empty()
347 }),
348 }))
349 }
350
351 #[inline]
352 pub fn id(&self) -> StreamId {
353 self.0.id
354 }
355
356 #[inline]
357 pub fn tag(&self) -> &'static str {
358 self.0.con.tag()
359 }
360
361 #[inline]
363 pub fn is_remote(&self) -> bool {
364 self.0.flags.get().contains(StreamFlags::REMOTE)
365 }
366
367 #[inline]
369 pub fn is_failed(&self) -> bool {
370 self.0.flags.get().contains(StreamFlags::FAILED)
371 }
372
373 pub(crate) fn service(&self) -> &'static str {
374 self.0.con.service()
375 }
376
377 pub(crate) fn send_state(&self) -> HalfState {
378 self.0.send.get()
379 }
380
381 pub(crate) fn recv_state(&self) -> HalfState {
382 self.0.recv.get()
383 }
384
385 pub(crate) fn disconnect_on_drop(&self) {
386 self.0.insert_flag(StreamFlags::DISCONNECT_ON_DROP);
387 }
388
389 pub(crate) fn is_disconnect_on_drop(&self) -> bool {
390 self.0.flags.get().contains(StreamFlags::DISCONNECT_ON_DROP)
391 }
392
393 #[inline]
398 pub fn reset(&self, reason: Reason) -> bool {
399 if !self.0.recv.get().is_closed() || !self.0.send.get().is_closed() {
400 self.0.con.encode(Reset::new(self.0.id, reason));
401 self.0.reset_stream(Some(reason));
402 true
403 } else {
404 false
405 }
406 }
407
408 #[inline]
410 pub fn empty_capacity(&self) -> Capacity {
411 Capacity {
412 size: Cell::new(0),
413 stream: self.0.clone(),
414 }
415 }
416
417 #[inline]
418 pub(crate) fn into_stream(self) -> Stream {
419 Stream(self)
420 }
421
422 pub(crate) fn send_headers(&self, mut hdrs: Headers) {
423 hdrs.set_end_headers();
424 if hdrs.is_end_stream() {
425 self.0.state_send_close(None);
426 } else {
427 self.0.state_send_payload();
428 }
429 log::trace!(
430 "{}: send headers {:#?} eos: {:?}",
431 self.tag(),
432 hdrs,
433 hdrs.is_end_stream()
434 );
435
436 if hdrs
437 .pseudo()
438 .status
439 .is_some_and(|status| status.is_informational())
440 {
441 self.0.content_length.set(ContentLength::Head);
442 }
443 self.0.con.encode(hdrs);
444 }
445
446 pub(crate) fn set_go_away(&self, reason: Reason) {
447 self.0.remote_reset_stream(reason);
448 }
449
450 pub(crate) fn set_failed_stream(&self, err: Error<OperationError>) {
451 self.0.failed(err);
452 }
453
454 pub(crate) fn recv_headers(
455 &self,
456 hdrs: Headers,
457 ) -> Result<Option<Message>, Error<StreamError>> {
458 log::trace!(
459 "{}: processing HEADERS for {:?}:\n{:#?}\nrecv_state:{:?}, send_state: {:?}",
460 self.tag(),
461 self.0.id,
462 hdrs,
463 self.0.recv.get(),
464 self.0.send.get(),
465 );
466
467 match self.0.recv.get() {
468 HalfState::Idle => {
469 let eof = hdrs.is_end_stream();
470 if eof {
471 self.0.state_recv_close(None);
472 } else {
473 self.0.state_recv_payload();
474 }
475 let (pseudo, headers) = hdrs.into_parts();
476
477 if self.0.content_length.get() != ContentLength::Head
478 && let Some(content_length) = headers.get(CONTENT_LENGTH)
479 {
480 if let Some(v) = parse_u64(content_length.as_bytes()) {
481 self.0.content_length.set(ContentLength::Remaining(v));
482 } else {
483 proto_err!(stream: "could not parse content-length; stream={:?}", self.0.id);
484 return Err(Error::new(
485 StreamError::InvalidContentLength,
486 self.service(),
487 ));
488 }
489 }
490 Ok(Some(Message::new(pseudo, headers, eof, self)))
491 }
492 HalfState::Payload => {
493 if hdrs.is_end_stream() {
495 self.0.state_recv_close(None);
496 Ok(Some(Message::trailers(hdrs.into_fields(), self)))
497 } else {
498 Err(Error::new(StreamError::TrailersWithoutEos, self.service()))
499 }
500 }
501 HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
502 }
503 }
504
505 pub(crate) fn recv_data(&self, data: Data) -> Result<Option<Message>, Error<StreamError>> {
506 let cap = Capacity::new(data.payload().len() as u32, &self.0);
507 log::trace!(
508 "{}: processing DATA frame for {:?}, len: {:?}",
509 self.tag(),
510 self.0.id,
511 data.payload().len()
512 );
513
514 match self.0.recv.get() {
515 HalfState::Payload => {
516 let eof = data.is_end_stream();
517
518 match self.0.content_length.get() {
520 ContentLength::Remaining(rem) => {
521 match rem.checked_sub(data.payload().len() as u64) {
522 Some(val) => {
523 self.0.content_length.set(ContentLength::Remaining(val));
524 if eof && val != 0 {
525 return Err(Error::new(
526 StreamError::WrongPayloadLength,
527 self.service(),
528 ));
529 }
530 }
531 None => {
532 return Err(Error::new(
533 StreamError::WrongPayloadLength,
534 self.service(),
535 ));
536 }
537 }
538 }
539 ContentLength::Head => {
540 if !data.payload().is_empty() {
541 return Err(Error::new(StreamError::NonEmptyPayload, self.service()));
542 }
543 }
544 ContentLength::Omitted => (),
545 }
546
547 if eof {
548 self.0.state_recv_close(None);
549 Ok(Some(Message::eof_data(data.into_payload(), self)))
550 } else {
551 Ok(Some(Message::data(data.into_payload(), cap, self)))
552 }
553 }
554 HalfState::Idle => Err(Error::new(
555 StreamError::Idle("DATA framed received"),
556 self.service(),
557 )),
558 HalfState::Closed(_) => Err(Error::new(StreamError::Closed, self.service())),
559 }
560 }
561
562 pub(crate) fn recv_rst_stream(&self, frm: Reset) {
563 self.0.remote_reset_stream(frm.reason());
564 }
565
566 pub(crate) fn recv_window_update_connection(&self) {
567 if self.0.flags.get().contains(StreamFlags::WAIT_FOR_CAPACITY)
568 && self.0.send_window.get().window_size() > 0
569 {
570 self.0.send_cap.wake();
571 }
572 }
573
574 pub(crate) fn recv_window_update(&self, frm: WindowUpdate) -> Result<(), Error<StreamError>> {
575 if frm.size_increment() == 0 {
576 Err(Error::new(
577 StreamError::WindowZeroUpdateValue,
578 self.service(),
579 ))
580 } else {
581 let window = self
582 .0
583 .send_window
584 .get()
585 .inc(frm.size_increment())
586 .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?;
587 self.0.send_window.set(window);
588
589 if window.window_size() > 0 {
590 self.0.send_cap.wake();
591 }
592 Ok(())
593 }
594 }
595
596 pub(crate) fn update_send_window(&self, upd: i32) -> Result<(), Error<StreamError>> {
597 let orig = self.0.send_window.get();
598 let window = match upd.cmp(&0) {
599 cmp::Ordering::Less => orig.dec(upd.unsigned_abs()), cmp::Ordering::Greater => orig
601 .inc(upd)
602 .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
603 cmp::Ordering::Equal => return Ok(()),
604 };
605 log::trace!(
606 "{}: Updating send window size from {} to {}",
607 self.tag(),
608 orig.window_size,
609 window.window_size
610 );
611 self.0.send_window.set(window);
612 Ok(())
613 }
614
615 pub(crate) fn update_recv_window(
616 &self,
617 upd: i32,
618 ) -> Result<Option<WindowSize>, Error<StreamError>> {
619 let mut window = match upd.cmp(&0) {
620 cmp::Ordering::Less => self.0.recv_window.get().dec(upd.unsigned_abs()), cmp::Ordering::Greater => self
622 .0
623 .recv_window
624 .get()
625 .inc(upd)
626 .map_err(|()| Error::new(StreamError::WindowOverflowed, self.service()))?,
627 cmp::Ordering::Equal => return Ok(None),
628 };
629 if let Some(val) = window.update(
630 self.0.recv_size.get(),
631 self.0.con.config().window_sz,
632 self.0.con.config().window_sz_threshold,
633 ) {
634 self.0.recv_window.set(window);
635 Ok(Some(val))
636 } else {
637 self.0.recv_window.set(window);
638 Ok(None)
639 }
640 }
641
642 pub fn send_response(
644 &self,
645 status: StatusCode,
646 headers: HeaderMap,
647 eof: bool,
648 ) -> Result<(), Error<OperationError>> {
649 self.0.check_error()?;
650
651 match self.0.send.get() {
652 HalfState::Idle => {
653 let pseudo = PseudoHeaders::response(status);
654 let mut hdrs = Headers::new(self.0.id, pseudo, headers, eof);
655
656 if eof {
657 hdrs.set_end_stream();
658 self.0.state_send_close(None);
659 } else {
660 self.0.state_send_payload();
661 }
662 self.0.con.encode(hdrs);
663 Ok(())
664 }
665 HalfState::Payload => Err(Error::new(OperationError::Payload, self.0.con.service())),
666 HalfState::Closed(r) => {
667 Err(Error::new(OperationError::Closed(r), self.0.con.service()))
668 }
669 }
670 }
671
672 pub async fn send_payload(
674 &self,
675 mut res: Bytes,
676 eof: bool,
677 ) -> Result<(), Error<OperationError>> {
678 match self.0.send.get() {
679 HalfState::Payload => {
680 self.0.check_error()?;
682
683 log::trace!(
684 "{}: {:?} sending {} bytes, eof: {}, send: {:?}",
685 self.0.tag(),
686 self.0.id,
687 res.len(),
688 eof,
689 self.0.send.get()
690 );
691
692 if eof && res.is_empty() {
694 let mut data = Data::new(self.0.id, Bytes::new());
695 data.set_end_stream();
696 self.0.state_send_close(None);
697
698 self.0.con.encode(data);
700 return Ok(());
701 }
702
703 loop {
704 let win = self.available_send_capacity() as usize;
706 if win > 0 {
707 let size =
708 cmp::min(win, cmp::min(res.len(), self.0.con.remote_frame_size()));
709 let mut data = if size >= res.len() {
710 Data::new(self.0.id, mem::replace(&mut res, Bytes::new()))
711 } else {
712 log::trace!(
713 "{}: {:?} sending {} out of {} bytes",
714 self.0.tag(),
715 self.0.id,
716 size,
717 res.len()
718 );
719 Data::new(self.0.id, res.split_to(size))
720 };
721 if eof && res.is_empty() {
722 data.set_end_stream();
723 self.0.state_send_close(None);
724 }
725
726 self.0
728 .send_window
729 .set(self.0.send_window.get().dec(size as u32));
730
731 self.0.con.consume_send_window(size as u32);
733
734 self.0.con.encode(data);
736 if res.is_empty() {
737 return Ok(());
738 }
739 } else {
740 log::trace!(
741 "{}: Not enough sending capacity for {:?} remaining {:?}",
742 self.0.tag(),
743 self.0.id,
744 res.len()
745 );
746 self.send_capacity().await?;
748 }
749 }
750 }
751 HalfState::Idle => Err(Error::new(OperationError::Idle, self.0.con.service())),
752 HalfState::Closed(reason) => Err(Error::new(
753 OperationError::Closed(reason),
754 self.0.con.service(),
755 )),
756 }
757 }
758
759 pub fn send_trailers(&self, map: HeaderMap) {
761 if self.0.send.get() == HalfState::Payload {
762 let mut hdrs = Headers::trailers(self.0.id, map);
763 hdrs.set_end_headers();
764 hdrs.set_end_stream();
765 self.0.con.encode(hdrs);
766 self.0.state_send_close(None);
767 }
768 }
769
770 pub fn available_send_capacity(&self) -> WindowSize {
771 cmp::min(
772 self.0.send_window.get().window_size(),
773 self.0.con.send_window_size(),
774 )
775 }
776
777 pub async fn send_capacity(&self) -> Result<WindowSize, Error<OperationError>> {
778 poll_fn(|cx| self.poll_send_capacity(cx)).await
779 }
780
781 pub fn poll_send_capacity(
783 &self,
784 cx: &Context<'_>,
785 ) -> Poll<Result<WindowSize, Error<OperationError>>> {
786 self.0.check_error()?;
787 self.0.con.check_error()?;
788
789 let win = self.available_send_capacity();
790 if win > 0 {
791 self.0.remove_flag(StreamFlags::WAIT_FOR_CAPACITY);
792 Poll::Ready(Ok(win))
793 } else {
794 self.0.insert_flag(StreamFlags::WAIT_FOR_CAPACITY);
795 self.0.send_cap.register(cx.waker());
796 Poll::Pending
797 }
798 }
799
800 pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), Error<OperationError>>> {
802 if self.0.send.get().is_closed() {
803 Poll::Ready(Ok(()))
804 } else {
805 self.0.check_error()?;
806 self.0.con.check_error()?;
807 self.0.send_reset.register(cx.waker());
808 Poll::Pending
809 }
810 }
811}
812
813impl PartialEq for StreamRef {
814 fn eq(&self, other: &StreamRef) -> bool {
815 Rc::as_ptr(&self.0) == Rc::as_ptr(&other.0)
816 }
817}
818
819impl ops::Deref for Stream {
820 type Target = StreamRef;
821
822 #[inline]
823 fn deref(&self) -> &Self::Target {
824 &self.0
825 }
826}
827
828impl Drop for Stream {
829 fn drop(&mut self) {
830 self.0.reset(Reason::CANCEL);
831 }
832}
833
834impl fmt::Debug for Stream {
835 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
836 let mut builder = f.debug_struct("Stream");
837 builder
838 .field("stream_id", &self.0.0.id)
839 .field("recv_state", &self.0.0.recv.get())
840 .field("send_state", &self.0.0.send.get())
841 .finish()
842 }
843}
844
845impl fmt::Debug for StreamState {
846 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
847 let mut builder = f.debug_struct("StreamState");
848 builder
849 .field("id", &self.id)
850 .field("recv", &self.recv.get())
851 .field("recv_window", &self.recv_window.get())
852 .field("recv_size", &self.recv_size.get())
853 .field("send", &self.send.get())
854 .field("send_window", &self.send_window.get())
855 .field("flags", &self.flags.get())
856 .finish()
857 }
858}
859
860pub(super) fn parse_u64(src: &[u8]) -> Option<u64> {
861 if src.len() > 19 {
862 None
864 } else {
865 let mut ret = 0;
866 for &d in src {
867 if !d.is_ascii_digit() {
868 return None;
869 }
870
871 ret *= 10;
872 ret += u64::from(d - b'0');
873 }
874
875 Some(ret)
876 }
877}