a653rs_linux/
apex.rs

1use std::process::exit;
2use std::thread::sleep;
3
4use a653rs::bindings::*;
5use a653rs::prelude::{Name, SystemTime};
6use nix::libc::EAGAIN;
7
8use a653rs_linux_core::error::SystemError;
9use a653rs_linux_core::queuing::{QueuingDestination, QueuingSource};
10use a653rs_linux_core::sampling::{SamplingDestination, SamplingSource};
11
12use crate::partition::ApexLinuxPartition;
13use crate::process::Process as LinuxProcess;
14use crate::*;
15
16impl ApexPartitionP4 for ApexLinuxPartition {
17    fn get_partition_status() -> ApexPartitionStatus {
18        let operating_mode = PARTITION_MODE.read().unwrap();
19
20        ApexPartitionStatus {
21            period: CONSTANTS.period.as_nanos() as i64,
22            duration: CONSTANTS.duration.as_nanos() as i64,
23            identifier: CONSTANTS.identifier,
24            lock_level: 0,
25            operating_mode,
26            start_condition: CONSTANTS.start_condition,
27            num_assigned_cores: 1,
28        }
29    }
30
31    fn set_partition_mode(operating_mode: OperatingMode) -> Result<(), ErrorReturnCode> {
32        let current_mode = PARTITION_MODE.read().unwrap();
33
34        if let OperatingMode::Idle = current_mode {
35            panic!()
36        }
37
38        match (operating_mode, current_mode) {
39            (OperatingMode::Normal, OperatingMode::Normal) => Err(ErrorReturnCode::NoAction),
40            (OperatingMode::WarmStart, OperatingMode::ColdStart) => {
41                Err(ErrorReturnCode::InvalidMode)
42            }
43            (OperatingMode::Normal, _) => {
44                SENDER
45                    .try_send(&PartitionCall::Transition(operating_mode))
46                    .unwrap();
47                loop {
48                    sleep(Duration::from_secs(500))
49                }
50            }
51            (_, _) => {
52                SENDER
53                    .try_send(&PartitionCall::Transition(operating_mode))
54                    .unwrap();
55                exit(0)
56            }
57        }
58    }
59}
60
61impl ApexProcessP4 for ApexLinuxPartition {
62    fn create_process(attributes: &ApexProcessAttribute) -> Result<ProcessId, ErrorReturnCode> {
63        // TODO do not unwrap both
64        // Check current State (only allowed in warm and cold start)
65        let attr = attributes.clone().into();
66        Ok(LinuxProcess::create(attr).unwrap())
67    }
68
69    fn start(process_id: ProcessId) -> Result<(), ErrorReturnCode> {
70        let proc = match process_id {
71            1 => APERIODIC_PROCESS.get(),
72            2 => PERIODIC_PROCESS.get(),
73            _ => None,
74        };
75
76        let proc = match proc {
77            Some(proc) => proc,
78            None => return Err(ErrorReturnCode::InvalidParam),
79        };
80
81        // TODO use a bigger result which contains both panic and non-panic errors
82        proc.start().unwrap();
83
84        Ok(())
85    }
86}
87
88impl ApexSamplingPortP4 for ApexLinuxPartition {
89    fn create_sampling_port(
90        sampling_port_name: SamplingPortName,
91        // TODO Return ErrorCode for wrong max message size
92        _max_message_size: MessageSize,
93        port_direction: PortDirection,
94        refresh_period: ApexSystemTime,
95    ) -> Result<SamplingPortId, ErrorReturnCode> {
96        if refresh_period <= 0 {
97            trace!("yielding InvalidConfig, because refresh period <= 0");
98            return Err(ErrorReturnCode::InvalidConfig);
99        }
100
101        let name = Name::new(sampling_port_name);
102        let name = name.to_str().map_err(|e| {
103            trace!("yielding InvalidConfig, because sampling port is not valid UTF-8:\n{e}");
104            ErrorReturnCode::InvalidConfig
105        })?;
106        if let Some((i, s)) = CONSTANTS
107            .sampling
108            .iter()
109            .enumerate()
110            .find(|(_, s)| s.name.eq(name))
111        {
112            if s.dir != port_direction {
113                trace!("yielding InvalidConfig, because mismatching port direction:\nexpected {:?}, got {port_direction:?}", s.dir);
114                return Err(ErrorReturnCode::InvalidConfig);
115            }
116
117            let refresh = SystemTime::new(refresh_period).unwrap_duration();
118            let ch = (i, refresh);
119
120            let mut channels = SAMPLING_PORTS.read().unwrap();
121            if channels.try_push(ch).is_some() {
122                trace!(
123                    "yielding InvalidConfig, maximum number of sampling ports already reached: {}",
124                    channels.len()
125                );
126                return Err(ErrorReturnCode::InvalidConfig);
127            }
128            SAMPLING_PORTS.write(&channels).unwrap();
129
130            return Ok(channels.len() as SamplingPortId);
131        }
132
133        trace!("yielding InvalidConfig, configuration does not declare sampling port {name}");
134        Err(ErrorReturnCode::InvalidConfig)
135    }
136
137    fn write_sampling_message(
138        sampling_port_id: SamplingPortId,
139        message: &[ApexByte],
140    ) -> Result<(), ErrorReturnCode> {
141        if let Some((port, _)) = SAMPLING_PORTS
142            .read()
143            .unwrap()
144            .get(sampling_port_id as usize - 1)
145        {
146            if let Some(port) = CONSTANTS.sampling.get(*port) {
147                if message.len() > port.msg_size {
148                    return Err(ErrorReturnCode::InvalidConfig);
149                } else if message.is_empty() {
150                    return Err(ErrorReturnCode::InvalidParam);
151                } else if port.dir != PortDirection::Source {
152                    return Err(ErrorReturnCode::InvalidMode);
153                }
154                SamplingSource::try_from(port.fd).unwrap().write(message);
155                return Ok(());
156            }
157        }
158
159        Err(ErrorReturnCode::InvalidParam)
160    }
161
162    unsafe fn read_sampling_message(
163        sampling_port_id: SamplingPortId,
164        message: &mut [ApexByte],
165    ) -> Result<(Validity, MessageSize), ErrorReturnCode> {
166        let read = if let Ok(read) = SAMPLING_PORTS.read() {
167            read
168        } else {
169            return Err(ErrorReturnCode::NotAvailable);
170        };
171        if let Some((port, val)) = read.get(sampling_port_id as usize - 1) {
172            if let Some(port) = CONSTANTS.sampling.get(*port) {
173                if message.is_empty() {
174                    return Err(ErrorReturnCode::InvalidParam);
175                } else if port.dir != PortDirection::Destination {
176                    return Err(ErrorReturnCode::InvalidMode);
177                }
178                let (msg_len, copied) = SamplingDestination::try_from(port.fd)
179                    .unwrap()
180                    .read(message);
181
182                if msg_len == 0 {
183                    return Err(ErrorReturnCode::NoAction);
184                }
185
186                let valid = if copied.elapsed() <= *val {
187                    Validity::Valid
188                } else {
189                    Validity::Invalid
190                };
191
192                return Ok((valid, msg_len as u32));
193            }
194        }
195
196        Err(ErrorReturnCode::InvalidParam)
197    }
198}
199
200impl ApexQueuingPortP4 for ApexLinuxPartition {
201    fn create_queuing_port(
202        queuing_port_name: QueuingPortName,
203        max_message_size: MessageSize,
204        max_nb_message: MessageRange,
205        port_direction: PortDirection,
206        _queuing_discipline: QueuingDiscipline,
207    ) -> Result<QueuingPortId, ErrorReturnCode> {
208        let name = Name::new(queuing_port_name);
209        let name = name.to_str().map_err(|e| {
210            trace!("yielding InvalidConfig, because queuing port is not valid UTF-8:\n{e}");
211            ErrorReturnCode::InvalidConfig
212        })?;
213
214        if let Some((i, q)) = CONSTANTS
215            .queuing
216            .iter()
217            .enumerate()
218            .find(|(_, q)| q.name.eq(name))
219        {
220            // check max message size
221            if max_message_size != q.msg_size as MessageSize {
222                trace!("yielding InvalidConfig, because the queuing port max message size ({}) mismatches the configuration table value ({})", max_message_size, q.msg_size);
223                return Err(ErrorReturnCode::InvalidConfig);
224            }
225
226            // check max number of messages
227            if max_nb_message != q.max_num_msg as MessageRange {
228                trace!("yielding InvalidConfig, because the queuing port max number of messages ({}) mismatches the configuration table value ({})", max_nb_message, q.max_num_msg);
229                return Err(ErrorReturnCode::InvalidConfig);
230            }
231
232            // check correct port direction
233            if q.dir != port_direction {
234                trace!("yielding InvalidConfig, because queuing port has mismatching port direction:\nexpected {:?}, got {port_direction:?}", q.dir);
235                return Err(ErrorReturnCode::InvalidConfig);
236            }
237
238            // check partition mode
239            if let OperatingMode::Normal = PARTITION_MODE.read().unwrap() {
240                trace!("yielding InvalidMode, because queuing port creation is not allowed in normal mode");
241                return Err(ErrorReturnCode::InvalidMode);
242            }
243
244            let ch = i;
245
246            let mut channels = QUEUING_PORTS.read().unwrap();
247
248            // check if channel already exists
249            if channels.contains(&ch) {
250                trace!("yielding NoAction, because queuing port has already been created");
251                return Err(ErrorReturnCode::NoAction);
252            }
253
254            // check if max number of channels is reached
255            if channels.try_push(ch).is_some() {
256                trace!(
257                    "yielding InvalidConfig, maximum number of queuing ports (={}) already reached",
258                    channels.len()
259                );
260                return Err(ErrorReturnCode::InvalidConfig);
261            }
262            QUEUING_PORTS.write(&channels).unwrap();
263
264            return Ok(channels.len() as QueuingPortId);
265        }
266
267        trace!("yielding InvalidConfig, configuration does not declare queuing port {name}");
268        Err(ErrorReturnCode::InvalidConfig)
269    }
270
271    fn send_queuing_message(
272        queuing_port_id: QueuingPortId,
273        message: &[ApexByte],
274        _time_out: ApexSystemTime,
275    ) -> Result<(), ErrorReturnCode> {
276        let port = QUEUING_PORTS
277            .read()
278            .ok()
279            .and_then(|ports| ports.into_iter().nth(queuing_port_id as usize - 1))
280            .and_then(|port| CONSTANTS.queuing.get(port))
281            .ok_or(ErrorReturnCode::InvalidParam)?;
282
283        if message.len() > port.msg_size {
284            return Err(ErrorReturnCode::InvalidConfig);
285        } else if message.is_empty() {
286            return Err(ErrorReturnCode::InvalidParam);
287        } else if port.dir != PortDirection::Source {
288            return Err(ErrorReturnCode::InvalidMode);
289        }
290
291        let written_bytes = QueuingSource::try_from(port.fd)
292            .unwrap()
293            .write(message, *SYSTEM_TIME)
294            .ok_or(ErrorReturnCode::NotAvailable)?; // Queue is overflowed
295
296        if written_bytes < message.len() {
297            warn!(
298                "Tried to write {} bytes to queuing port, but only {} bytes could be written",
299                message.len(),
300                written_bytes
301            );
302        }
303
304        Ok(())
305    }
306
307    unsafe fn receive_queuing_message(
308        queuing_port_id: QueuingPortId,
309        _time_out: ApexSystemTime,
310        message: &mut [ApexByte],
311    ) -> Result<(MessageSize, QueueOverflow), ErrorReturnCode> {
312        let port = QUEUING_PORTS
313            .read()
314            .ok()
315            .and_then(|ports| ports.into_iter().nth(queuing_port_id as usize - 1))
316            .and_then(|port| CONSTANTS.queuing.get(port))
317            .ok_or(ErrorReturnCode::InvalidParam)?;
318
319        if message.is_empty() {
320            return Err(ErrorReturnCode::InvalidParam);
321        } else if port.dir != PortDirection::Destination {
322            return Err(ErrorReturnCode::InvalidMode);
323        }
324        let (msg_len, has_overflowed) = QueuingDestination::try_from(port.fd)
325            .unwrap()
326            .read(message)
327            .ok_or(ErrorReturnCode::NotAvailable)?; // standard states that a length of 0 should also be set here, which the API
328                                                    // does not allow
329
330        Ok((msg_len as MessageSize, has_overflowed as QueueOverflow))
331    }
332
333    fn get_queuing_port_status(
334        queuing_port_id: QueuingPortId,
335    ) -> Result<QueuingPortStatus, ErrorReturnCode> {
336        let port = QUEUING_PORTS
337            .read()
338            .ok()
339            .and_then(|ports| ports.into_iter().nth(queuing_port_id as usize - 1))
340            .and_then(|port| CONSTANTS.queuing.get(port))
341            .ok_or(ErrorReturnCode::InvalidParam)?;
342
343        let num_msgs = match port.dir {
344            PortDirection::Source => QueuingSource::try_from(port.fd)
345                .unwrap()
346                .get_current_num_messages(),
347            PortDirection::Destination => QueuingDestination::try_from(port.fd)
348                .unwrap()
349                .get_current_num_messages(),
350        };
351
352        let status = QueuingPortStatus {
353            nb_message: num_msgs as MessageRange,
354            max_nb_message: port.max_num_msg as MessageRange,
355            max_message_size: port.msg_size as MessageSize,
356            port_direction: port.dir,
357            waiting_processes: 0,
358        };
359
360        Ok(status)
361    }
362
363    fn clear_queuing_port(queuing_port_id: QueuingPortId) -> Result<(), ErrorReturnCode> {
364        let port = QUEUING_PORTS
365            .read()
366            .ok()
367            .and_then(|ports| ports.into_iter().nth(queuing_port_id as usize - 1))
368            .and_then(|port| CONSTANTS.queuing.get(port))
369            .ok_or(ErrorReturnCode::InvalidParam)?;
370
371        if port.dir != PortDirection::Destination {
372            return Err(ErrorReturnCode::InvalidMode);
373        }
374
375        QueuingDestination::try_from(port.fd)
376            .unwrap()
377            .clear(*SYSTEM_TIME);
378
379        Ok(())
380    }
381}
382
383impl ApexTimeP4 for ApexLinuxPartition {
384    fn periodic_wait() -> Result<(), ErrorReturnCode> {
385        // TODO do not unwrap() (Maybe raise an error?);
386        let proc = LinuxProcess::get_self().unwrap();
387        if !proc.periodic() {
388            return Err(ErrorReturnCode::InvalidMode);
389        }
390
391        proc.cg().unwrap().freeze().unwrap();
392        Ok(())
393    }
394
395    fn get_time() -> ApexSystemTime {
396        SYSTEM_TIME
397            .elapsed()
398            .as_nanos()
399            .clamp(0, ApexSystemTime::MAX as u128) as ApexSystemTime
400    }
401}
402
403impl ApexErrorP4 for ApexLinuxPartition {
404    fn report_application_message(message: &[ApexByte]) -> Result<(), ErrorReturnCode> {
405        if message.len() > MAX_ERROR_MESSAGE_SIZE {
406            return Err(ErrorReturnCode::InvalidParam);
407        }
408        if let Ok(msg) = std::str::from_utf8(message) {
409            // Logging may fail temporarily, because the resource can not be written to
410            // (e.g. queue is full), but the API does not allow us any other
411            // return code than INVALID_PARAM.
412            if let Err(e) = SENDER.try_send(&PartitionCall::Message(msg.to_string())) {
413                if let Some(e) = e.source().downcast_ref::<std::io::Error>() {
414                    if e.raw_os_error() == Some(EAGAIN) {
415                        return Ok(());
416                    }
417                }
418                panic!("Failed to report application message: {}", e);
419            }
420        }
421        Ok(())
422    }
423
424    fn raise_application_error(
425        error_code: ErrorCode,
426        message: &[ApexByte],
427    ) -> Result<(), ErrorReturnCode> {
428        if let ErrorCode::ApplicationError = error_code {
429            Self::report_application_message(message).unwrap();
430            Self::raise_system_error(SystemError::ApplicationError);
431            Ok(())
432        } else {
433            Err(ErrorReturnCode::InvalidParam)
434        }
435    }
436}