1use std::process::exit;
2use std::thread::sleep;
3
4use a653rs::bindings::*;
5use a653rs::prelude::{Name, SystemTime};
6use nix::libc::EAGAIN;
7
8use a653rs_linux_core::error::SystemError;
9use a653rs_linux_core::queuing::{QueuingDestination, QueuingSource};
10use a653rs_linux_core::sampling::{SamplingDestination, SamplingSource};
11
12use crate::partition::ApexLinuxPartition;
13use crate::process::Process as LinuxProcess;
14use crate::*;
15
16impl ApexPartitionP4 for ApexLinuxPartition {
17 fn get_partition_status() -> ApexPartitionStatus {
18 let operating_mode = PARTITION_MODE.read().unwrap();
19
20 ApexPartitionStatus {
21 period: CONSTANTS.period.as_nanos() as i64,
22 duration: CONSTANTS.duration.as_nanos() as i64,
23 identifier: CONSTANTS.identifier,
24 lock_level: 0,
25 operating_mode,
26 start_condition: CONSTANTS.start_condition,
27 num_assigned_cores: 1,
28 }
29 }
30
31 fn set_partition_mode(operating_mode: OperatingMode) -> Result<(), ErrorReturnCode> {
32 let current_mode = PARTITION_MODE.read().unwrap();
33
34 if let OperatingMode::Idle = current_mode {
35 panic!()
36 }
37
38 match (operating_mode, current_mode) {
39 (OperatingMode::Normal, OperatingMode::Normal) => Err(ErrorReturnCode::NoAction),
40 (OperatingMode::WarmStart, OperatingMode::ColdStart) => {
41 Err(ErrorReturnCode::InvalidMode)
42 }
43 (OperatingMode::Normal, _) => {
44 SENDER
45 .try_send(&PartitionCall::Transition(operating_mode))
46 .unwrap();
47 loop {
48 sleep(Duration::from_secs(500))
49 }
50 }
51 (_, _) => {
52 SENDER
53 .try_send(&PartitionCall::Transition(operating_mode))
54 .unwrap();
55 exit(0)
56 }
57 }
58 }
59}
60
61impl ApexProcessP4 for ApexLinuxPartition {
62 fn create_process(attributes: &ApexProcessAttribute) -> Result<ProcessId, ErrorReturnCode> {
63 let attr = attributes.clone().into();
66 Ok(LinuxProcess::create(attr).unwrap())
67 }
68
69 fn start(process_id: ProcessId) -> Result<(), ErrorReturnCode> {
70 let proc = match process_id {
71 1 => APERIODIC_PROCESS.get(),
72 2 => PERIODIC_PROCESS.get(),
73 _ => None,
74 };
75
76 let proc = match proc {
77 Some(proc) => proc,
78 None => return Err(ErrorReturnCode::InvalidParam),
79 };
80
81 proc.start().unwrap();
83
84 Ok(())
85 }
86}
87
88impl ApexSamplingPortP4 for ApexLinuxPartition {
89 fn create_sampling_port(
90 sampling_port_name: SamplingPortName,
91 _max_message_size: MessageSize,
93 port_direction: PortDirection,
94 refresh_period: ApexSystemTime,
95 ) -> Result<SamplingPortId, ErrorReturnCode> {
96 if refresh_period <= 0 {
97 trace!("yielding InvalidConfig, because refresh period <= 0");
98 return Err(ErrorReturnCode::InvalidConfig);
99 }
100
101 let name = Name::new(sampling_port_name);
102 let name = name.to_str().map_err(|e| {
103 trace!("yielding InvalidConfig, because sampling port is not valid UTF-8:\n{e}");
104 ErrorReturnCode::InvalidConfig
105 })?;
106 if let Some((i, s)) = CONSTANTS
107 .sampling
108 .iter()
109 .enumerate()
110 .find(|(_, s)| s.name.eq(name))
111 {
112 if s.dir != port_direction {
113 trace!("yielding InvalidConfig, because mismatching port direction:\nexpected {:?}, got {port_direction:?}", s.dir);
114 return Err(ErrorReturnCode::InvalidConfig);
115 }
116
117 let refresh = SystemTime::new(refresh_period).unwrap_duration();
118 let ch = (i, refresh);
119
120 let mut channels = SAMPLING_PORTS.read().unwrap();
121 if channels.try_push(ch).is_some() {
122 trace!(
123 "yielding InvalidConfig, maximum number of sampling ports already reached: {}",
124 channels.len()
125 );
126 return Err(ErrorReturnCode::InvalidConfig);
127 }
128 SAMPLING_PORTS.write(&channels).unwrap();
129
130 return Ok(channels.len() as SamplingPortId);
131 }
132
133 trace!("yielding InvalidConfig, configuration does not declare sampling port {name}");
134 Err(ErrorReturnCode::InvalidConfig)
135 }
136
137 fn write_sampling_message(
138 sampling_port_id: SamplingPortId,
139 message: &[ApexByte],
140 ) -> Result<(), ErrorReturnCode> {
141 if let Some((port, _)) = SAMPLING_PORTS
142 .read()
143 .unwrap()
144 .get(sampling_port_id as usize - 1)
145 {
146 if let Some(port) = CONSTANTS.sampling.get(*port) {
147 if message.len() > port.msg_size {
148 return Err(ErrorReturnCode::InvalidConfig);
149 } else if message.is_empty() {
150 return Err(ErrorReturnCode::InvalidParam);
151 } else if port.dir != PortDirection::Source {
152 return Err(ErrorReturnCode::InvalidMode);
153 }
154 SamplingSource::try_from(port.fd).unwrap().write(message);
155 return Ok(());
156 }
157 }
158
159 Err(ErrorReturnCode::InvalidParam)
160 }
161
162 unsafe fn read_sampling_message(
163 sampling_port_id: SamplingPortId,
164 message: &mut [ApexByte],
165 ) -> Result<(Validity, MessageSize), ErrorReturnCode> {
166 let read = if let Ok(read) = SAMPLING_PORTS.read() {
167 read
168 } else {
169 return Err(ErrorReturnCode::NotAvailable);
170 };
171 if let Some((port, val)) = read.get(sampling_port_id as usize - 1) {
172 if let Some(port) = CONSTANTS.sampling.get(*port) {
173 if message.is_empty() {
174 return Err(ErrorReturnCode::InvalidParam);
175 } else if port.dir != PortDirection::Destination {
176 return Err(ErrorReturnCode::InvalidMode);
177 }
178 let (msg_len, copied) = SamplingDestination::try_from(port.fd)
179 .unwrap()
180 .read(message);
181
182 if msg_len == 0 {
183 return Err(ErrorReturnCode::NoAction);
184 }
185
186 let valid = if copied.elapsed() <= *val {
187 Validity::Valid
188 } else {
189 Validity::Invalid
190 };
191
192 return Ok((valid, msg_len as u32));
193 }
194 }
195
196 Err(ErrorReturnCode::InvalidParam)
197 }
198}
199
200impl ApexQueuingPortP4 for ApexLinuxPartition {
201 fn create_queuing_port(
202 queuing_port_name: QueuingPortName,
203 max_message_size: MessageSize,
204 max_nb_message: MessageRange,
205 port_direction: PortDirection,
206 _queuing_discipline: QueuingDiscipline,
207 ) -> Result<QueuingPortId, ErrorReturnCode> {
208 let name = Name::new(queuing_port_name);
209 let name = name.to_str().map_err(|e| {
210 trace!("yielding InvalidConfig, because queuing port is not valid UTF-8:\n{e}");
211 ErrorReturnCode::InvalidConfig
212 })?;
213
214 if let Some((i, q)) = CONSTANTS
215 .queuing
216 .iter()
217 .enumerate()
218 .find(|(_, q)| q.name.eq(name))
219 {
220 if max_message_size != q.msg_size as MessageSize {
222 trace!("yielding InvalidConfig, because the queuing port max message size ({}) mismatches the configuration table value ({})", max_message_size, q.msg_size);
223 return Err(ErrorReturnCode::InvalidConfig);
224 }
225
226 if max_nb_message != q.max_num_msg as MessageRange {
228 trace!("yielding InvalidConfig, because the queuing port max number of messages ({}) mismatches the configuration table value ({})", max_nb_message, q.max_num_msg);
229 return Err(ErrorReturnCode::InvalidConfig);
230 }
231
232 if q.dir != port_direction {
234 trace!("yielding InvalidConfig, because queuing port has mismatching port direction:\nexpected {:?}, got {port_direction:?}", q.dir);
235 return Err(ErrorReturnCode::InvalidConfig);
236 }
237
238 if let OperatingMode::Normal = PARTITION_MODE.read().unwrap() {
240 trace!("yielding InvalidMode, because queuing port creation is not allowed in normal mode");
241 return Err(ErrorReturnCode::InvalidMode);
242 }
243
244 let ch = i;
245
246 let mut channels = QUEUING_PORTS.read().unwrap();
247
248 if channels.contains(&ch) {
250 trace!("yielding NoAction, because queuing port has already been created");
251 return Err(ErrorReturnCode::NoAction);
252 }
253
254 if channels.try_push(ch).is_some() {
256 trace!(
257 "yielding InvalidConfig, maximum number of queuing ports (={}) already reached",
258 channels.len()
259 );
260 return Err(ErrorReturnCode::InvalidConfig);
261 }
262 QUEUING_PORTS.write(&channels).unwrap();
263
264 return Ok(channels.len() as QueuingPortId);
265 }
266
267 trace!("yielding InvalidConfig, configuration does not declare queuing port {name}");
268 Err(ErrorReturnCode::InvalidConfig)
269 }
270
271 fn send_queuing_message(
272 queuing_port_id: QueuingPortId,
273 message: &[ApexByte],
274 _time_out: ApexSystemTime,
275 ) -> Result<(), ErrorReturnCode> {
276 let port = QUEUING_PORTS
277 .read()
278 .ok()
279 .and_then(|ports| ports.into_iter().nth(queuing_port_id as usize - 1))
280 .and_then(|port| CONSTANTS.queuing.get(port))
281 .ok_or(ErrorReturnCode::InvalidParam)?;
282
283 if message.len() > port.msg_size {
284 return Err(ErrorReturnCode::InvalidConfig);
285 } else if message.is_empty() {
286 return Err(ErrorReturnCode::InvalidParam);
287 } else if port.dir != PortDirection::Source {
288 return Err(ErrorReturnCode::InvalidMode);
289 }
290
291 let written_bytes = QueuingSource::try_from(port.fd)
292 .unwrap()
293 .write(message, *SYSTEM_TIME)
294 .ok_or(ErrorReturnCode::NotAvailable)?; if written_bytes < message.len() {
297 warn!(
298 "Tried to write {} bytes to queuing port, but only {} bytes could be written",
299 message.len(),
300 written_bytes
301 );
302 }
303
304 Ok(())
305 }
306
307 unsafe fn receive_queuing_message(
308 queuing_port_id: QueuingPortId,
309 _time_out: ApexSystemTime,
310 message: &mut [ApexByte],
311 ) -> Result<(MessageSize, QueueOverflow), ErrorReturnCode> {
312 let port = QUEUING_PORTS
313 .read()
314 .ok()
315 .and_then(|ports| ports.into_iter().nth(queuing_port_id as usize - 1))
316 .and_then(|port| CONSTANTS.queuing.get(port))
317 .ok_or(ErrorReturnCode::InvalidParam)?;
318
319 if message.is_empty() {
320 return Err(ErrorReturnCode::InvalidParam);
321 } else if port.dir != PortDirection::Destination {
322 return Err(ErrorReturnCode::InvalidMode);
323 }
324 let (msg_len, has_overflowed) = QueuingDestination::try_from(port.fd)
325 .unwrap()
326 .read(message)
327 .ok_or(ErrorReturnCode::NotAvailable)?; Ok((msg_len as MessageSize, has_overflowed as QueueOverflow))
331 }
332
333 fn get_queuing_port_status(
334 queuing_port_id: QueuingPortId,
335 ) -> Result<QueuingPortStatus, ErrorReturnCode> {
336 let port = QUEUING_PORTS
337 .read()
338 .ok()
339 .and_then(|ports| ports.into_iter().nth(queuing_port_id as usize - 1))
340 .and_then(|port| CONSTANTS.queuing.get(port))
341 .ok_or(ErrorReturnCode::InvalidParam)?;
342
343 let num_msgs = match port.dir {
344 PortDirection::Source => QueuingSource::try_from(port.fd)
345 .unwrap()
346 .get_current_num_messages(),
347 PortDirection::Destination => QueuingDestination::try_from(port.fd)
348 .unwrap()
349 .get_current_num_messages(),
350 };
351
352 let status = QueuingPortStatus {
353 nb_message: num_msgs as MessageRange,
354 max_nb_message: port.max_num_msg as MessageRange,
355 max_message_size: port.msg_size as MessageSize,
356 port_direction: port.dir,
357 waiting_processes: 0,
358 };
359
360 Ok(status)
361 }
362
363 fn clear_queuing_port(queuing_port_id: QueuingPortId) -> Result<(), ErrorReturnCode> {
364 let port = QUEUING_PORTS
365 .read()
366 .ok()
367 .and_then(|ports| ports.into_iter().nth(queuing_port_id as usize - 1))
368 .and_then(|port| CONSTANTS.queuing.get(port))
369 .ok_or(ErrorReturnCode::InvalidParam)?;
370
371 if port.dir != PortDirection::Destination {
372 return Err(ErrorReturnCode::InvalidMode);
373 }
374
375 QueuingDestination::try_from(port.fd)
376 .unwrap()
377 .clear(*SYSTEM_TIME);
378
379 Ok(())
380 }
381}
382
383impl ApexTimeP4 for ApexLinuxPartition {
384 fn periodic_wait() -> Result<(), ErrorReturnCode> {
385 let proc = LinuxProcess::get_self().unwrap();
387 if !proc.periodic() {
388 return Err(ErrorReturnCode::InvalidMode);
389 }
390
391 proc.cg().unwrap().freeze().unwrap();
392 Ok(())
393 }
394
395 fn get_time() -> ApexSystemTime {
396 SYSTEM_TIME
397 .elapsed()
398 .as_nanos()
399 .clamp(0, ApexSystemTime::MAX as u128) as ApexSystemTime
400 }
401}
402
403impl ApexErrorP4 for ApexLinuxPartition {
404 fn report_application_message(message: &[ApexByte]) -> Result<(), ErrorReturnCode> {
405 if message.len() > MAX_ERROR_MESSAGE_SIZE {
406 return Err(ErrorReturnCode::InvalidParam);
407 }
408 if let Ok(msg) = std::str::from_utf8(message) {
409 if let Err(e) = SENDER.try_send(&PartitionCall::Message(msg.to_string())) {
413 if let Some(e) = e.source().downcast_ref::<std::io::Error>() {
414 if e.raw_os_error() == Some(EAGAIN) {
415 return Ok(());
416 }
417 }
418 panic!("Failed to report application message: {}", e);
419 }
420 }
421 Ok(())
422 }
423
424 fn raise_application_error(
425 error_code: ErrorCode,
426 message: &[ApexByte],
427 ) -> Result<(), ErrorReturnCode> {
428 if let ErrorCode::ApplicationError = error_code {
429 Self::report_application_message(message).unwrap();
430 Self::raise_system_error(SystemError::ApplicationError);
431 Ok(())
432 } else {
433 Err(ErrorReturnCode::InvalidParam)
434 }
435 }
436}