citadel_proto 0.3.0

Networking library for the Citadel Protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
use citadel_crypt::stacked_ratchet::StackedRatchet;
use citadel_wire::udp_traversal::linear::encrypted_config_container::EncryptedConfigContainer;
use citadel_wire::udp_traversal::targetted_udp_socket_addr::HolePunchedUdpSocket;
use netbeam::sync::RelativeNodeType;

use crate::constants::HOLE_PUNCH_SYNC_TIME_MULTIPLIER;
use crate::error::NetworkError;
use crate::proto::misc::udp_internal_interface::{
    QuicUdpSocketConnector, RawUdpSocketConnector, UdpSplittableTypes,
};
use crate::proto::packet::packet_flags::payload_identifiers;
use crate::proto::packet_crafter::peer_cmd::C2S_ENCRYPTION_ONLY;
use crate::proto::peer::hole_punch_compat_sink_stream::ReliableOrderedCompatStream;
use crate::proto::peer::peer_layer::UdpMode;
use crate::proto::state_container::{StateContainerInner, VirtualTargetType};

use super::includes::*;
use crate::proto::node_result::ConnectFail;
use crate::proto::packet_processor::primary_group_packet::get_proper_hyper_ratchet;
use crate::proto::state_subcontainers::preconnect_state_container::UdpChannelSender;
use citadel_wire::exports::Connection;
use citadel_wire::udp_traversal::udp_hole_puncher::EndpointHolePunchExt;
use netbeam::sync::network_endpoint::NetworkEndpoint;
use std::sync::atomic::Ordering;

/// Handles preconnect packets. Handles the NAT traversal
#[cfg_attr(feature = "localhost-testing", tracing::instrument(target = "citadel", skip_all, ret, err, fields(is_server = session_orig.is_server, src = packet.parse().unwrap().0.session_cid.get(), target = packet.parse().unwrap().0.target_cid.get())))]
pub async fn process_preconnect(
    session_orig: &HdpSession,
    packet: HdpPacket,
    header_drill_vers: u32,
) -> Result<PrimaryProcessorResult, NetworkError> {
    let session = session_orig.clone();

    if !session.is_provisional() {
        log::error!(target: "citadel", "Pre-Connect packet received, but the system is not in a provisional state. Dropping");
        return Ok(PrimaryProcessorResult::Void);
    }

    let task = async move {
        let session = &session;
        let (header_main, payload) = return_if_none!(packet.parse(), "Unable to parse packet");
        let header = header_main;
        let security_level = header.security_level.into();

        match header.cmd_aux {
            packet_flags::cmd::aux::do_preconnect::SYN => {
                log::trace!(target: "citadel", "RECV STAGE SYN PRE_CONNECT PACKET");
                // TODO: prevent logins if semvers out of sync. For now, don't
                let adjacent_proto_version = header.protocol_version.get();
                if proto_version_out_of_sync(adjacent_proto_version)? {
                    log::warn!(target: "citadel", "\nLocal protocol version: {} | Adjacent protocol version: {} | Versions out of sync; program may not function\n", *crate::constants::PROTOCOL_VERSION, adjacent_proto_version);
                    // TODO: protocol translations for inter-version compatibility
                }
                // first make sure the cid isn't already connected
                let session_already_active = session
                    .session_manager
                    .session_active(header.session_cid.get());
                let account_manager = session.account_manager.clone();
                let header_if_err_occurs = header.clone();

                let error = |err: NetworkError| {
                    let packet = packet_crafter::pre_connect::craft_halt(
                        &header_if_err_occurs,
                        err.into_string(),
                    );
                    Ok(PrimaryProcessorResult::ReplyToSender(packet))
                };

                if session_already_active {
                    return error(NetworkError::InvalidRequest("Session Already Connected"));
                }

                if let Some(cnac) = account_manager
                    .get_client_by_cid(header.session_cid.get())
                    .await?
                {
                    let mut state_container = inner_mut_state!(session.state_container);

                    match validation::pre_connect::validate_syn(
                        &cnac,
                        packet,
                        &session.session_manager,
                    ) {
                        Ok((
                            static_aux_ratchet,
                            transfer,
                            session_security_settings,
                            peer_only_connect_mode,
                            udp_mode,
                            kat,
                            nat_type,
                            new_hyper_ratchet,
                        )) => {
                            session.adjacent_nat_type.set_once(Some(nat_type));
                            state_container.pre_connect_state.generated_ratchet =
                                Some(new_hyper_ratchet);
                            // since the SYN's been validated, the CNACs toolset has been updated
                            let new_session_sec_lvl = transfer.security_level;

                            log::trace!(target: "citadel", "Synchronizing toolsets. UDP mode: {:?}. Session security level: {:?}", udp_mode, new_session_sec_lvl);
                            // TODO: Rate limiting to prevent SYN flooding
                            let timestamp = session.time_tracker.get_global_time_ns();

                            state_container.pre_connect_state.on_packet_received();

                            state_container.pre_connect_state.last_stage =
                                packet_flags::cmd::aux::do_preconnect::SYN_ACK;
                            state_container.keep_alive_timeout_ns = kat;

                            // here, we also send the peer's external address to itself
                            // Also, we use the security level that was created on init b/c the other side still uses the static aux ratchet
                            let syn_ack = packet_crafter::pre_connect::craft_syn_ack(
                                &static_aux_ratchet,
                                transfer,
                                session.local_nat_type.clone(),
                                timestamp,
                                security_level,
                            );

                            state_container.udp_mode = udp_mode;
                            state_container.cnac = Some(cnac);
                            state_container.session_security_settings =
                                Some(session_security_settings);
                            session
                                .peer_only_connect_protocol
                                .set(Some(peer_only_connect_mode));

                            Ok(PrimaryProcessorResult::ReplyToSender(syn_ack))
                        }

                        Err(err) => {
                            log::error!(target: "citadel", "Invalid SYN packet received: {:?}", &err);
                            error(err)
                        }
                    }
                } else {
                    let bad_cid = header.session_cid.get();
                    let error = format!("CID {} is not registered to this node", bad_cid);
                    let packet = packet_crafter::pre_connect::craft_halt(&header, error);
                    Ok(PrimaryProcessorResult::ReplyToSender(packet))
                }
            }

            packet_flags::cmd::aux::do_preconnect::SYN_ACK => {
                log::trace!(target: "citadel", "RECV STAGE SYN_ACK PRE_CONNECT PACKET");
                let cnac = &(return_if_none!(
                    inner_state!(session.state_container).cnac.clone(),
                    "SESS Cnac not loaded"
                ));
                let implicated_cid = header.session_cid.get();

                let (stream, new_hyper_ratchet) = {
                    let mut state_container = inner_mut_state!(session.state_container);
                    if state_container.pre_connect_state.last_stage
                        == packet_flags::cmd::aux::do_preconnect::SYN_ACK
                    {
                        // cnac should already be loaded locally
                        let alice_constructor = return_if_none!(
                            state_container.pre_connect_state.constructor.take(),
                            "Alice constructor not loaded"
                        );
                        let implicated_cid = header.session_cid.get();
                        if let Some((new_hyper_ratchet, nat_type)) =
                            validation::pre_connect::validate_syn_ack(
                                cnac,
                                alice_constructor,
                                packet,
                            )
                        {
                            // The toolset, at this point, has already been updated. The CNAC can be used to
                            //let ref drill = cnac.get_drill_blocking(None)?;
                            session.adjacent_nat_type.set_once(Some(nat_type));
                            state_container.pre_connect_state.generated_ratchet =
                                Some(new_hyper_ratchet.clone());

                            let local_node_type = session.local_node_type;
                            let timestamp = session.time_tracker.get_global_time_ns();
                            //let local_bind_addr = session.local_bind_addr.ip();
                            //let local_bind_addr = session.implicated_user_p2p_internal_listener_addr.clone()?;

                            if state_container.udp_mode == UdpMode::Disabled {
                                let stage0_preconnect_packet =
                                    packet_crafter::pre_connect::craft_stage0(
                                        &new_hyper_ratchet,
                                        timestamp,
                                        local_node_type,
                                        security_level,
                                    );
                                state_container.pre_connect_state.last_stage =
                                    packet_flags::cmd::aux::do_preconnect::SUCCESS;
                                return Ok(PrimaryProcessorResult::ReplyToSender(
                                    stage0_preconnect_packet,
                                ));
                            }

                            // another check. If we are already using a QUIC connection for the primary stream, we don't need to hole-punch.
                            if let Some(quic_conn) =
                                inner_mut!(session.primary_stream_quic_conn).take()
                            {
                                log::trace!(target: "citadel", "Skipping NAT traversal since QUIC is enabled for this session");
                                return send_success_as_initiator(
                                    Some(get_quic_udp_interface(
                                        quic_conn,
                                        session.local_bind_addr,
                                    )),
                                    &new_hyper_ratchet,
                                    session,
                                    security_level,
                                    implicated_cid,
                                    &mut state_container,
                                );
                            }

                            let stage0_preconnect_packet =
                                packet_crafter::pre_connect::craft_stage0(
                                    &new_hyper_ratchet,
                                    timestamp,
                                    local_node_type,
                                    security_level,
                                );
                            let to_primary_stream = return_if_none!(
                                session.to_primary_stream.clone(),
                                "Primary stream not loaded"
                            );
                            to_primary_stream.unbounded_send(stage0_preconnect_packet)?;

                            //let hole_puncher = SingleUDPHolePuncher::new_initiator(session.local_nat_type.clone(), generate_hole_punch_crypt_container(new_hyper_ratchet.clone(), SecurityLevel::Standard), nat_type, local_bind_addr, server_external_addr, server_internal_addr).ok()?;
                            let stream = ReliableOrderedCompatStream::new(
                                to_primary_stream,
                                &mut state_container,
                                C2S_ENCRYPTION_ONLY,
                                new_hyper_ratchet.clone(),
                                security_level,
                            );
                            (stream, new_hyper_ratchet)
                        } else {
                            log::error!(target: "citadel", "Invalid SYN_ACK");
                            return Ok(PrimaryProcessorResult::Void);
                        }
                    } else {
                        log::error!(target: "citadel", "Expected stage SYN_ACK, but local state was not valid");
                        return Ok(PrimaryProcessorResult::Void);
                    }
                };

                let conn = &(NetworkEndpoint::register(RelativeNodeType::Initiator, stream)
                    .await
                    .map_err(|err| NetworkError::Generic(err.to_string()))?);
                log::trace!(target: "citadel", "Initiator created");
                let res = conn
                    .begin_udp_hole_punch(generate_hole_punch_crypt_container(
                        new_hyper_ratchet.clone(),
                        SecurityLevel::Standard,
                        C2S_ENCRYPTION_ONLY,
                    ))
                    .await;

                match res {
                    Ok(ret) => {
                        log::trace!(target: "citadel", "Initiator finished NAT traversal ...");
                        send_success_as_initiator(
                            Some(get_raw_udp_interface(ret)),
                            &new_hyper_ratchet,
                            session,
                            security_level,
                            implicated_cid,
                            &mut inner_mut_state!(session.state_container),
                        )
                    }

                    Err(err) => {
                        log::warn!(target: "citadel", "Hole punch attempt failed {:?}", err.to_string());
                        send_success_as_initiator(
                            None,
                            &new_hyper_ratchet,
                            session,
                            security_level,
                            implicated_cid,
                            &mut inner_mut_state!(session.state_container),
                        )
                    }
                }
            }

            packet_flags::cmd::aux::do_preconnect::STAGE0 => {
                log::trace!(target: "citadel", "RECV STAGE 0 PRE_CONNECT PACKET");

                let implicated_cid = header.session_cid.get();
                let (hyper_ratchet, stream) = {
                    let mut state_container = inner_mut_state!(session.state_container);
                    // At this point, the user's static-key identity has been verified. We can now check the online status to ensure no double-logins
                    let hyper_ratchet = return_if_none!(
                        get_proper_hyper_ratchet(
                            header.drill_version.get(),
                            &state_container,
                            None
                        ),
                        "HR version not found"
                    );

                    if state_container.pre_connect_state.last_stage
                        == packet_flags::cmd::aux::do_preconnect::SYN_ACK
                    {
                        if validation::pre_connect::validate_stage0(&hyper_ratchet, packet)
                            .is_some()
                        {
                            let timestamp = session.time_tracker.get_global_time_ns();

                            //let peer_nat_type = return_if_none!(session.adjacent_nat_type.clone(), "adjacent NAT type not loaded");
                            //let peer_accessible = peer_nat_type.predict_external_addr_from_local_bind_port(0).is_some();

                            if state_container.udp_mode == UdpMode::Disabled {
                                // since this node is the server, send a BEGIN CONNECT signal to alice
                                // We have to modify the state to ensure that this node can receive a DO_CONNECT packet
                                state_container.pre_connect_state.success = true;
                                let packet = packet_crafter::pre_connect::craft_begin_connect(
                                    &hyper_ratchet,
                                    timestamp,
                                    security_level,
                                );
                                return Ok(PrimaryProcessorResult::ReplyToSender(packet));
                            } // .. otherwise, continue logic below to punch a hole through the firewall

                            //let _peer_internal_addr = session.implicated_user_p2p_internal_listener_addr.clone()?;
                            let to_primary_stream = return_if_none!(
                                session.to_primary_stream.clone(),
                                "Primary stream not loaded"
                            );

                            let stream = ReliableOrderedCompatStream::new(
                                to_primary_stream,
                                &mut state_container,
                                C2S_ENCRYPTION_ONLY,
                                hyper_ratchet.clone(),
                                security_level,
                            );
                            (hyper_ratchet, stream)
                        } else {
                            log::error!(target: "citadel", "Unable to validate stage 0 packet");
                            return Ok(PrimaryProcessorResult::Void);
                        }
                    } else {
                        log::error!(target: "citadel", "Packet state 0, last stage not 0. Dropping");
                        return Ok(PrimaryProcessorResult::Void);
                    }
                };

                let conn = &(NetworkEndpoint::register(RelativeNodeType::Receiver, stream)
                    .await
                    .map_err(|err| NetworkError::Generic(err.to_string()))?);
                log::trace!(target: "citadel", "Receiver created");

                let res = conn
                    .begin_udp_hole_punch(generate_hole_punch_crypt_container(
                        hyper_ratchet.clone(),
                        SecurityLevel::Standard,
                        C2S_ENCRYPTION_ONLY,
                    ))
                    .await;

                match res {
                    Ok(ret) => handle_success_as_receiver(
                        Some(get_raw_udp_interface(ret)),
                        session,
                        implicated_cid,
                        &mut inner_mut_state!(session.state_container),
                    ),

                    Err(err) => {
                        log::warn!(target: "citadel", "Hole punch attempt failed ({}). Will fallback to TCP only mode. Will await for adjacent node to continue exchange", err.to_string());
                        // We await the initiator to choose a method
                        let mut state_container = inner_mut_state!(session.state_container);
                        state_container.udp_mode = UdpMode::Disabled;
                        state_container.pre_connect_state.last_stage =
                            packet_flags::cmd::aux::do_preconnect::SUCCESS;
                        Ok(PrimaryProcessorResult::Void)
                    }
                }
            }

            // Alice (initiator) sends this to Bob (receiver)
            packet_flags::cmd::aux::do_preconnect::SUCCESS
            | packet_flags::cmd::aux::do_preconnect::FAILURE => {
                let success = header.cmd_aux == packet_flags::cmd::aux::do_preconnect::SUCCESS;

                if success {
                    log::trace!(target: "citadel", "RECV STAGE SUCCESS PRE CONNECT PACKET");
                } else {
                    log::trace!(target: "citadel", "RECV STAGE FAILURE PRE CONNECT PACKET");
                }

                let timestamp = session.time_tracker.get_global_time_ns();
                let mut state_container = inner_mut_state!(session.state_container);
                let hr = return_if_none!(
                    get_proper_hyper_ratchet(header_drill_vers, &state_container, None),
                    "Could not get proper HR [preconnect0]"
                );
                let cnac = &(return_if_none!(state_container.cnac.clone(), "Sess CNAC not loaded"));
                let tcp_only = header.algorithm == payload_identifiers::do_preconnect::TCP_ONLY;
                let (header, packet, ..) = packet.decompose();
                if let Some((header, _, hyper_ratchet)) =
                    validation::aead::validate(hr, &header, packet)
                {
                    state_container.pre_connect_state.success = true;
                    if !success {
                        state_container.udp_mode = UdpMode::Disabled;
                    }

                    // if we are using tcp_only, skip the rest and go straight to sending the packet
                    if tcp_only {
                        log::warn!(target: "citadel", "Received signal to fall-back to TCP only mode");
                        let begin_connect = packet_crafter::pre_connect::craft_begin_connect(
                            &hyper_ratchet,
                            timestamp,
                            security_level,
                        );
                        return Ok(PrimaryProcessorResult::ReplyToSender(begin_connect));
                    }

                    // another check. If we are already using a QUIC connection for the primary stream, AND we are using UDP mode, then this
                    // server node will need to mirror the opposite side and setup a UDP conn internally
                    if state_container.udp_mode == UdpMode::Enabled {
                        if let Some(quic_conn) = inner_mut!(session.primary_stream_quic_conn).take()
                        {
                            log::trace!(target: "citadel", "[Server/QUIC-UDP] Loading ...");
                            let _ = handle_success_as_receiver(
                                Some(get_quic_udp_interface(quic_conn, session.local_bind_addr)),
                                session,
                                header.session_cid.get(),
                                &mut state_container,
                            )?;
                        }
                    }

                    // if we aren't using tcp only, and, failed, end the session
                    if !tcp_only && !success {
                        let ticket = state_container
                            .pre_connect_state
                            .ticket
                            .unwrap_or_else(|| session.kernel_ticket.get());
                        std::mem::drop(state_container);
                        //session.needs_close_message.set(false);
                        session.send_to_kernel(NodeResult::ConnectFail(ConnectFail {
                            ticket,
                            cid_opt: Some(cnac.get_cid()),
                            error_message: "Preconnect stage failed".to_string(),
                        }))?;
                        Ok(PrimaryProcessorResult::EndSession(
                            "Failure packet received",
                        ))
                    } else {
                        let begin_connect = packet_crafter::pre_connect::craft_begin_connect(
                            &hyper_ratchet,
                            timestamp,
                            security_level,
                        );
                        Ok(PrimaryProcessorResult::ReplyToSender(begin_connect))
                    }
                } else {
                    log::error!(target: "citadel", "Unable to validate success packet. Dropping");
                    Ok(PrimaryProcessorResult::Void)
                }
            }

            // the client gets this. The client must now begin the connect process
            packet_flags::cmd::aux::do_preconnect::BEGIN_CONNECT => {
                log::trace!(target: "citadel", "RECV STAGE BEGIN_CONNECT PRE CONNECT PACKET");
                let mut state_container = inner_mut_state!(session.state_container);
                let hr = return_if_none!(
                    get_proper_hyper_ratchet(header_drill_vers, &state_container, None),
                    "Could not get proper HR [preconnect1]"
                );

                if state_container.pre_connect_state.last_stage
                    == packet_flags::cmd::aux::do_preconnect::SUCCESS
                {
                    let (header, payload, _, _) = packet.decompose();
                    if let Some((_, _, hyper_ratchet)) =
                        validation::aead::validate(hr, &header, payload)
                    {
                        state_container.pre_connect_state.success = true;
                        std::mem::drop(state_container);
                        // now, begin stage 0 connect
                        begin_connect_process(session, &hyper_ratchet, security_level)
                    } else {
                        log::error!(target: "citadel", "Unable to validate success_ack packet. Dropping");
                        Ok(PrimaryProcessorResult::Void)
                    }
                } else {
                    log::error!(target: "citadel", "Last stage is not SUCCESS, yet a BEGIN_CONNECT packet was received. Dropping");
                    Ok(PrimaryProcessorResult::Void)
                }
            }

            packet_flags::cmd::aux::do_preconnect::HALT => {
                let message =
                    String::from_utf8(payload.to_vec()).unwrap_or_else(|_| "INVALID UTF-8".into());
                let ticket = session.kernel_ticket.get();
                session.send_to_kernel(NodeResult::ConnectFail(ConnectFail {
                    ticket,
                    cid_opt: Some(header.session_cid.get()),
                    error_message: message,
                }))?;
                //session.needs_close_message.set(false);
                Ok(PrimaryProcessorResult::EndSession(
                    "Preconnect signalled to halt",
                ))
            }

            _ => {
                log::error!(target: "citadel", "Invalid auxiliary command");
                Ok(PrimaryProcessorResult::Void)
            }
        }
    };

    to_concurrent_processor!(task)
}

fn begin_connect_process(
    session: &HdpSession,
    hyper_ratchet: &StackedRatchet,
    security_level: SecurityLevel,
) -> Result<PrimaryProcessorResult, NetworkError> {
    // at this point, the session keys have already been re-established. We just need to begin the login stage
    let mut state_container = inner_mut_state!(session.state_container);
    let timestamp = session.time_tracker.get_global_time_ns();
    let proposed_credentials = return_if_none!(
        state_container.connect_state.proposed_credentials.take(),
        "Proposed creds not loaded"
    );

    let stage0_connect_packet = crate::proto::packet_crafter::do_connect::craft_stage0_packet(
        hyper_ratchet,
        proposed_credentials,
        timestamp,
        security_level,
    );
    state_container.connect_state.last_stage = packet_flags::cmd::aux::do_connect::STAGE1;
    // we now store the pqc temporarily in the state container
    //session.post_quantum = Some(new_pqc);
    std::mem::drop(state_container);
    session
        .state
        .store(SessionState::ConnectionProcess, Ordering::Relaxed);

    log::trace!(target: "citadel", "Successfully sent stage0 connect packet outbound");

    // Keep the session open even though we transitioned from the pre-connect to connect stage
    Ok(PrimaryProcessorResult::ReplyToSender(stage0_connect_packet))
}

fn send_success_as_initiator(
    udp_splittable: Option<UdpSplittableTypes>,
    hyper_ratchet: &StackedRatchet,
    session: &HdpSession,
    security_level: SecurityLevel,
    implicated_cid: u64,
    state_container: &mut StateContainerInner,
) -> Result<PrimaryProcessorResult, NetworkError> {
    let _ = handle_success_as_receiver(udp_splittable, session, implicated_cid, state_container)?;

    let success_packet = packet_crafter::pre_connect::craft_stage_final(
        hyper_ratchet,
        true,
        false,
        session.time_tracker.get_global_time_ns(),
        security_level,
    );
    Ok(PrimaryProcessorResult::ReplyToSender(success_packet))
}

fn handle_success_as_receiver(
    udp_splittable: Option<UdpSplittableTypes>,
    session: &HdpSession,
    implicated_cid: u64,
    state_container: &mut StateContainerInner,
) -> Result<PrimaryProcessorResult, NetworkError> {
    let tcp_loaded_alerter_rx = state_container.setup_tcp_alert_if_udp_c2s();

    state_container.pre_connect_state.last_stage = packet_flags::cmd::aux::do_preconnect::SUCCESS;
    state_container.pre_connect_state.on_packet_received();

    if state_container
        .pre_connect_state
        .udp_channel_oneshot_tx
        .tx
        .is_none()
    {
        // TODO ensure this exists BEFORE udp socket loading
        state_container.pre_connect_state.udp_channel_oneshot_tx = UdpChannelSender::default();
    }

    if let Some(udp_splittable) = udp_splittable {
        let peer_addr = udp_splittable.peer_addr();
        // the UDP subsystem will automatically engage at this point
        HdpSession::udp_socket_loader(
            session.clone(),
            VirtualTargetType::LocalGroupServer(implicated_cid),
            udp_splittable,
            peer_addr,
            session.kernel_ticket.get(),
            Some(tcp_loaded_alerter_rx),
        );
    } else {
        log::warn!(target: "citadel", "No UDP splittable was specified. UdpMode: {:?}", state_container.udp_mode);
    }
    // the server will await for the client to send an initiation packet
    Ok(PrimaryProcessorResult::Void)
}

pub(crate) fn generate_hole_punch_crypt_container(
    hyper_ratchet: StackedRatchet,
    security_level: SecurityLevel,
    target_cid: u64,
) -> EncryptedConfigContainer {
    let hyper_ratchet_cloned = hyper_ratchet.clone();

    EncryptedConfigContainer::new(
        move |plaintext| {
            packet_crafter::hole_punch::generate_packet(
                &hyper_ratchet,
                plaintext,
                security_level,
                target_cid,
            )
        },
        move |packet| {
            packet_crafter::hole_punch::decrypt_packet(
                &hyper_ratchet_cloned,
                packet,
                security_level,
            )
        },
    )
}

/// Returns the instant in time when the sync_time happens, and the inscribable i64 thereof
pub fn calculate_sync_time(current: i64, header: i64) -> (Instant, i64) {
    let ping = i64::abs(current - header) as u64;
    let delta = HOLE_PUNCH_SYNC_TIME_MULTIPLIER * (ping as f64);
    let delta = delta as i64;
    // we send this timestamp, allowing the other end to begin the hole-punching process once this moment is reached
    let sync_time_ns = current + delta;
    log::trace!(target: "citadel", "Sync time: {}", sync_time_ns);
    let sync_time_instant = Instant::now() + Duration::from_nanos(delta as u64);
    (sync_time_instant, sync_time_ns)
}

fn proto_version_out_of_sync(adjacent_proto_version: u32) -> Result<bool, NetworkError> {
    use embedded_semver::Semver;
    match Semver::from_u32(adjacent_proto_version) {
        Ok(their_version) => {
            // if either major or minor releases are not equal, assume breaking change
            Ok(
                their_version.major != crate::constants::MAJOR_VERSION as usize
                    || their_version.minor != crate::constants::MINOR_VERSION as usize,
            )
        }

        Err(_) => Err(NetworkError::InvalidRequest(
            "Unable to parse incoming protocol semver",
        )),
    }
}

fn get_raw_udp_interface(socket: HolePunchedUdpSocket) -> UdpSplittableTypes {
    log::trace!(target: "citadel", "Will use Raw UDP for UDP transmission");
    UdpSplittableTypes::Raw(RawUdpSocketConnector::new(
        socket.socket,
        socket.addr.send_address,
    ))
}

fn get_quic_udp_interface(quic_conn: Connection, local_addr: SocketAddr) -> UdpSplittableTypes {
    log::trace!(target: "citadel", "Will use QUIC UDP for UDP transmission");
    UdpSplittableTypes::Quic(QuicUdpSocketConnector::new(quic_conn, local_addr))
}

#[cfg(test)]
mod tests {
    use crate::constants::PROTOCOL_VERSION;
    use crate::proto::packet_processor::preconnect_packet::proto_version_out_of_sync;

    #[test]
    fn test_good_version() {
        let our_version = embedded_semver::Semver::from_u32(*PROTOCOL_VERSION).unwrap();
        for shift in 1..3 {
            let their_version = embedded_semver::Semver::new(
                our_version.major,
                our_version.minor,
                our_version.patch + shift,
            );
            assert_eq!(
                false,
                proto_version_out_of_sync(their_version.to_u32().unwrap()).unwrap()
            )
        }
    }

    #[test]
    fn test_bad_major_version() {
        let our_version = embedded_semver::Semver::from_u32(*PROTOCOL_VERSION).unwrap();
        for shift in 1..3 {
            let their_version = embedded_semver::Semver::new(
                our_version.major + shift,
                our_version.minor,
                our_version.patch,
            );
            assert_eq!(
                true,
                proto_version_out_of_sync(their_version.to_u32().unwrap()).unwrap()
            )
        }
    }

    #[test]
    fn test_bad_minor_version() {
        let our_version = embedded_semver::Semver::from_u32(*PROTOCOL_VERSION).unwrap();
        for shift in 1..3 {
            let their_version = embedded_semver::Semver::new(
                our_version.major,
                our_version.minor + shift,
                our_version.patch,
            );
            assert_eq!(
                true,
                proto_version_out_of_sync(their_version.to_u32().unwrap()).unwrap()
            )
        }
    }

    #[test]
    fn test_bad_parse() {
        assert!(proto_version_out_of_sync(u32::MAX).is_err());
    }
}