citadel_proto 0.13.0

Networking library for the Citadel Protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
//! File Transfer Packet Processor for Citadel Protocol
//!
//! This module handles secure file transfer operations in the Citadel Protocol network.
//! It manages the entire file transfer lifecycle, including metadata exchange,
//! chunked transfers, error handling, and virtual filesystem operations.
//!
//! # Features
//!
//! - Secure file transfer processing
//! - File metadata handling
//! - Chunked transfer support
//! - Error reporting and recovery
//! - Virtual filesystem integration
//! - Transfer state tracking
//! - Encryption level management
//!
//! # Important Notes
//!
//! - Requires connected session state
//! - All packets must be authenticated
//! - Supports virtual target routing
//! - Handles both direct and proxied transfers
//! - Maintains transfer security levels
//!
//! # Related Components
//!
//! - `StateContainer`: Manages transfer state
//! - `VirtualFileSystem`: Handles file operations
//! - `ObjectTransferHandle`: Tracks transfer progress
//! - `SecurityLevel`: Manages encryption levels

use super::includes::*;
use crate::error::NetworkError;
use crate::prelude::{InternalServerError, ReVFSResult, Ticket};
use crate::proto::packet_crafter::file::ReVFSPullAckPacket;
use crate::proto::packet_processor::header_to_response_vconn_type;
use crate::proto::packet_processor::primary_group_packet::{
    get_orientation_safe_ratchet, get_resp_target_cid_from_header,
};
use crate::proto::{get_preferred_primary_stream, send_with_error_logging};
use citadel_crypt::ratchets::Ratchet;
use citadel_types::proto::TransferType;

#[cfg_attr(feature = "localhost-testing", tracing::instrument(
    level = "trace",
    target = "citadel",
    skip_all,
    ret,
    err,
    fields(is_server = session.is_server, src = packet.parse().unwrap().0.session_cid.get(), target = packet.parse().unwrap().0.target_cid.get()
    )
))]
pub fn process_file_packet<R: Ratchet>(
    session: &CitadelSession<R>,
    packet: HdpPacket,
    proxy_cid_info: Option<(u64, u64)>,
) -> Result<PrimaryProcessorResult, NetworkError> {
    if !session.state.is_connected() {
        return Ok(PrimaryProcessorResult::Void);
    }

    let (header, payload, _, _) = packet.decompose();

    let mut state_container = inner_mut_state!(session.state_container);
    // get the proper pqc
    let header_bytes = &header[..];
    let header = return_if_none!(Ref::new(header_bytes), "Unable to validate header layout")
        as Ref<&[u8], HdpHeader>;
    let ratchet = return_if_none!(
        get_orientation_safe_ratchet(
            header.entropy_bank_version.get(),
            &state_container,
            proxy_cid_info
        ),
        "Unable to get proper HR"
    );
    let security_level = header.security_level.into();
    let ticket: Ticket = header.context_info.get().into();
    let ts = session.time_tracker.get_global_time_ns();

    // ALL FILE packets must be authenticated
    match validation::group::validate(&ratchet, security_level, header_bytes, payload) {
        Some(payload) => {
            match header.cmd_aux {
                packet_flags::cmd::aux::file::FILE_ERROR => {
                    log::error!(target: "citadel", "RECV FILE ERROR");
                    match validation::file::validate_file_error(&header, &payload[..]) {
                        Some(payload) => {
                            let ticket: Ticket = header.context_info.get().into();
                            let target_cid = header.session_cid.get();
                            let object_id = payload.object_id;

                            if let Err(err) = state_container
                                .notify_object_transfer_handle_failure_with(
                                    target_cid,
                                    object_id,
                                    payload.error_message.clone(),
                                )
                            {
                                log::error!(target: "citadel", "Failed to notify object transfer handle failure: {err}");
                            }

                            session.send_to_kernel(NodeResult::InternalServerError(
                                InternalServerError {
                                    ticket_opt: Some(ticket),
                                    message: payload.error_message,
                                    cid_opt: session.session_cid.get(),
                                },
                            ))?;

                            Ok(PrimaryProcessorResult::Void)
                        }

                        _ => {
                            log::error!(target: "citadel", "Unable to validate payload of file error");
                            Ok(PrimaryProcessorResult::Void)
                        }
                    }
                }
                packet_flags::cmd::aux::file::FILE_HEADER => {
                    log::trace!(target: "citadel", "RECV FILE HEADER");
                    match validation::file::validate_file_header(&header, &payload[..]) {
                        Some(payload) => {
                            let v_target = payload.virtual_target;
                            let vfm = payload.file_metadata;
                            let local_encryption_level = payload.local_encryption_level;
                            log::trace!(target: "citadel", "Declared local encryption level on file header: {local_encryption_level:?}");
                            let (target_cid, v_target_flipped) = match v_target {
                                VirtualConnectionType::LocalGroupPeer {
                                    session_cid,
                                    peer_cid: target_cid,
                                } => (
                                    session_cid,
                                    VirtualConnectionType::LocalGroupPeer {
                                        session_cid: target_cid,
                                        peer_cid: session_cid,
                                    },
                                ),

                                VirtualConnectionType::LocalGroupServer { session_cid } => {
                                    (0, VirtualConnectionType::LocalGroupServer { session_cid })
                                }

                                _ => {
                                    log::error!(target: "citadel", "HyperWAN functionality not yet enabled");
                                    return Ok(PrimaryProcessorResult::Void);
                                }
                            };

                            let preferred_primary_stream = return_if_none!(
                                get_preferred_primary_stream(&header, session, &state_container)
                            );

                            if !state_container.on_file_header_received(
                                &header,
                                v_target,
                                vfm,
                                session.account_manager.get_persistence_handler(),
                                session.state_container.clone(),
                                ratchet,
                                target_cid,
                                v_target_flipped,
                                preferred_primary_stream,
                                local_encryption_level,
                            ) {
                                log::warn!(target: "citadel", "Failed to run on_file_header_received");
                            }

                            // We do not send a rebound signal until AFTER the local user
                            // accepts the file transfer requests
                            Ok(PrimaryProcessorResult::Void)
                        }

                        _ => {
                            log::error!(target: "citadel", "Unable to validate payload of file header");
                            Ok(PrimaryProcessorResult::Void)
                        }
                    }
                }

                packet_flags::cmd::aux::file::FILE_HEADER_ACK => {
                    log::trace!(target: "citadel", "RECV FILE HEADER ACK");
                    match validation::file::validate_file_header_ack(&header, &payload[..]) {
                        Some(payload) => {
                            let success = payload.success;
                            let object_id = payload.object_id;
                            let v_target = payload.virtual_target;
                            let is_p2p =
                                header.session_cid.get() != 0 && header.target_cid.get() != 0;
                            let session_cid = if is_p2p {
                                header.target_cid.get()
                            } else {
                                header.session_cid.get()
                            };
                            //let session_cid = header.session_cid.get();
                            // conclude by passing this data into the state container
                            if state_container
                                .on_file_header_ack_received(
                                    success,
                                    session_cid,
                                    header.context_info.get().into(),
                                    object_id,
                                    v_target,
                                    payload.transfer_type,
                                )
                                .is_none()
                            {
                                log::error!(target: "citadel", "on_file_header_ack_received failed. File transfer attempt invalidated");
                            }

                            Ok(PrimaryProcessorResult::Void)
                        }

                        _ => {
                            log::error!(target: "citadel", "Unable to validate FILE HEADER ACK");
                            Ok(PrimaryProcessorResult::Void)
                        }
                    }
                }

                packet_flags::cmd::aux::file::REVFS_PULL => {
                    // Let A be the sender, and B be this node, the receiver.
                    // A is asking to pull its own file from B. To do this,
                    // we will send a file_header back to A with auto-accept on.
                    // This will cause the standard file transfer protocol to occur.
                    // The only extra information we need to give the adjacent endpoint
                    // is the metadata pertaining to the encryption strength used on
                    // the data.
                    match validation::file::validate_revfs_pull(&header, &payload) {
                        Some(packet) => {
                            let session = session.clone();
                            let preferred_primary_stream = return_if_none!(
                                get_preferred_primary_stream(&header, &session, &state_container)
                            );
                            let virtual_target = header_to_response_vconn_type(&header);
                            let revfs_cid = header.session_cid.get();
                            let resp_target_cid = get_resp_target_cid_from_header(&header);
                            let delete_on_pull = packet.delete_on_pull;

                            // get the real_path and security level used from the backend
                            let task = async move {
                                let response_payload = match session
                                    .account_manager
                                    .get_persistence_handler()
                                    .revfs_get_file_info(revfs_cid, packet.virtual_path)
                                    .await
                                {
                                    Ok((source, metadata)) => {
                                        let transfer_type = TransferType::FileTransfer; // use a basic file transfer since we don't need to data to be locally encrypted when sending it back
                                        let Some(local_encryption_level) =
                                            metadata.get_security_level()
                                        else {
                                            log::error!(target: "citadel", "The requested file was not designated as a RE-VFS type, yet, a metadata file existed for it");
                                            return;
                                        };

                                        match session.process_outbound_file(
                                            ticket,
                                            None,
                                            source,
                                            virtual_target,
                                            packet.security_level,
                                            transfer_type,
                                            Some(local_encryption_level),
                                            Some(metadata),
                                            move |source| {
                                                if delete_on_pull {
                                                    spawn!(citadel_io::tokio::fs::remove_file(
                                                        source
                                                    ));
                                                }
                                            },
                                        ) {
                                            Ok(_) => ReVFSPullAckPacket::Success,

                                            Err(err) => ReVFSPullAckPacket::Error {
                                                error: err.into_string(),
                                            },
                                        }
                                    }

                                    Err(err) => ReVFSPullAckPacket::Error {
                                        error: err.into_string(),
                                    },
                                };

                                // on top of spawning the file transfer subroutine prior to this,
                                // we will also send a REVFS pull ack
                                let response_packet = packet_crafter::file::craft_revfs_pull_ack(
                                    &ratchet,
                                    security_level,
                                    ticket,
                                    ts,
                                    resp_target_cid,
                                    response_payload,
                                );
                                send_with_error_logging(&preferred_primary_stream, response_packet);
                            };

                            spawn!(task);

                            Ok(PrimaryProcessorResult::Void)
                        }

                        None => {
                            log::error!(target: "citadel", "Unable to validate REVFS PULL packet");
                            Ok(PrimaryProcessorResult::Void)
                        }
                    }
                }

                packet_flags::cmd::aux::file::REVFS_DELETE => {
                    log::trace!(target: "citadel", "RECV REVFS DELETE");
                    match validation::file::validate_revfs_delete(&header, &payload) {
                        Some(payload) => {
                            let virtual_path = payload.virtual_path;
                            // we use the cid of the sender, because, they are requesting to alter data here
                            let re_vfs_cid = header.session_cid.get();
                            let resp_target_cid = get_resp_target_cid_from_header(&header);
                            let pers = session.account_manager.get_persistence_handler().clone();

                            let preferred_primary_stream = return_if_none!(
                                get_preferred_primary_stream(&header, session, &state_container)
                            );

                            let task = async move {
                                let err_opt = pers
                                    .revfs_delete(re_vfs_cid, virtual_path)
                                    .await
                                    .err()
                                    .map(|e| e.into_string());
                                let response_packet = packet_crafter::file::craft_revfs_ack(
                                    &ratchet,
                                    security_level,
                                    ticket,
                                    ts,
                                    resp_target_cid,
                                    err_opt,
                                );
                                send_with_error_logging(&preferred_primary_stream, response_packet);
                            };

                            spawn!(task);

                            Ok(PrimaryProcessorResult::Void)
                        }

                        None => {
                            log::error!(target: "citadel", "Unable to validate REVFS DELETE packet");
                            Ok(PrimaryProcessorResult::Void)
                        }
                    }
                }

                packet_flags::cmd::aux::file::REVFS_ACK => {
                    log::trace!(target: "citadel", "RECV REVFS ACK");
                    match validation::file::validate_revfs_ack(&header, &payload) {
                        Some(payload) => {
                            let response = NodeResult::ReVFS(ReVFSResult {
                                error_message: payload.error_msg,
                                data: None,
                                ticket,
                                session_cid: ratchet.get_cid(),
                            });

                            session.send_to_kernel(response)?;

                            Ok(PrimaryProcessorResult::Void)
                        }

                        None => {
                            log::error!(target: "citadel", "Unable to validate REVFS ACK packet");
                            Ok(PrimaryProcessorResult::Void)
                        }
                    }
                }

                packet_flags::cmd::aux::file::REVFS_PULL_ACK => {
                    log::trace!(target: "citadel", "RECV REVFS PULL ACK");
                    match validation::file::validate_revfs_pull_ack(&header, &payload) {
                        Some(payload) => match payload {
                            ReVFSPullAckPacket::Success => {
                                // Iwe will not send an ReVFSResult with data quite yet.
                                // Instead, we will send it once the inbound file transfer
                                // is complete
                                Ok(PrimaryProcessorResult::Void)
                            }
                            ReVFSPullAckPacket::Error { error } => {
                                let error_signal = NodeResult::ReVFS(ReVFSResult {
                                    error_message: Some(error),
                                    data: None,
                                    ticket,
                                    session_cid: ratchet.get_cid(),
                                });

                                session.send_to_kernel(error_signal)?;

                                Ok(PrimaryProcessorResult::Void)
                            }
                        },

                        None => {
                            log::error!(target: "citadel", "Invalid REVFS PULL ACK command received");
                            Ok(PrimaryProcessorResult::Void)
                        }
                    }
                }

                _ => {
                    log::error!(target: "citadel", "Invalid REVFS ACK command received");
                    Ok(PrimaryProcessorResult::Void)
                }
            }
        }

        _ => {
            log::error!(target: "citadel", "Unable to AES-GCM validate FILE packet");
            Ok(PrimaryProcessorResult::Void)
        }
    }
}